client_bulk.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. )
  16. // BulkAttachResult reports the outcome of a bulk attach across target inbounds.
  17. type BulkAttachResult struct {
  18. Attached []string `json:"attached"`
  19. Skipped []string `json:"skipped"`
  20. Errors []string `json:"errors"`
  21. }
  22. // BulkAttach attaches the given existing clients (by email) to each target inbound,
  23. // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds
  24. // all clients to a target in a single AddInboundClient call, and reports clients already
  25. // present on a target as skipped.
  26. func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) {
  27. result := &BulkAttachResult{}
  28. if len(emails) == 0 || len(inboundIds) == 0 {
  29. return result, false, nil
  30. }
  31. recordErr := func(format string, args ...any) {
  32. msg := fmt.Sprintf(format, args...)
  33. result.Errors = append(result.Errors, msg)
  34. logger.Warningf("[BulkAttach] %s", msg)
  35. }
  36. records := make([]*model.ClientRecord, 0, len(emails))
  37. seenEmail := make(map[string]struct{}, len(emails))
  38. for _, email := range emails {
  39. if email == "" {
  40. continue
  41. }
  42. key := strings.ToLower(email)
  43. if _, ok := seenEmail[key]; ok {
  44. continue
  45. }
  46. seenEmail[key] = struct{}{}
  47. rec, err := s.GetRecordByEmail(nil, email)
  48. if err != nil {
  49. recordErr("%s: %v", email, err)
  50. continue
  51. }
  52. records = append(records, rec)
  53. }
  54. emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs()
  55. if sidErr != nil {
  56. emailSubIDs = nil
  57. logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr)
  58. }
  59. needRestart := false
  60. for _, ibId := range inboundIds {
  61. inbound, err := inboundSvc.GetInbound(ibId)
  62. if err != nil {
  63. recordErr("inbound %d: %v", ibId, err)
  64. continue
  65. }
  66. existingClients, err := inboundSvc.GetClients(inbound)
  67. if err != nil {
  68. recordErr("inbound %d: %v", ibId, err)
  69. continue
  70. }
  71. have := make(map[string]struct{}, len(existingClients))
  72. for _, c := range existingClients {
  73. have[strings.ToLower(c.Email)] = struct{}{}
  74. }
  75. clientsToAdd := make([]model.Client, 0, len(records))
  76. for _, rec := range records {
  77. if _, attached := have[strings.ToLower(rec.Email)]; attached {
  78. result.Skipped = append(result.Skipped, rec.Email)
  79. continue
  80. }
  81. client := *rec.ToClient()
  82. client.UpdatedAt = time.Now().UnixMilli()
  83. if err := s.fillProtocolDefaults(&client, inbound); err != nil {
  84. recordErr("%s -> inbound %d: %v", rec.Email, ibId, err)
  85. continue
  86. }
  87. clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound))
  88. }
  89. if len(clientsToAdd) == 0 {
  90. continue
  91. }
  92. payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd})
  93. if err != nil {
  94. recordErr("inbound %d: %v", ibId, err)
  95. continue
  96. }
  97. nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  98. if err != nil {
  99. recordErr("inbound %d: %v", ibId, err)
  100. continue
  101. }
  102. if nr {
  103. needRestart = true
  104. }
  105. for _, c := range clientsToAdd {
  106. result.Attached = append(result.Attached, c.Email)
  107. }
  108. }
  109. return result, needRestart, nil
  110. }
  111. // BulkDetachResult reports the outcome of a bulk detach across target inbounds.
  112. type BulkDetachResult struct {
  113. Detached []string `json:"detached"`
  114. Skipped []string `json:"skipped"`
  115. Errors []string `json:"errors"`
  116. }
  117. // BulkDetach detaches the given existing clients (by email) from each target inbound.
  118. // (email, inbound) pairs where the client is not currently attached are silently skipped
  119. // at the inbound level; emails that aren't attached to any of the requested inbounds
  120. // are reported under skipped. ClientRecord rows are kept even when they become orphaned
  121. // (matches single-client detach semantics); callers should use bulkDelete for full removal.
  122. func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) {
  123. result := &BulkDetachResult{}
  124. if len(emails) == 0 || len(inboundIds) == 0 {
  125. return result, false, nil
  126. }
  127. recordErr := func(format string, args ...any) {
  128. msg := fmt.Sprintf(format, args...)
  129. result.Errors = append(result.Errors, msg)
  130. logger.Warningf("[BulkDetach] %s", msg)
  131. }
  132. requested := make(map[int]struct{}, len(inboundIds))
  133. for _, id := range inboundIds {
  134. requested[id] = struct{}{}
  135. }
  136. recsByInbound := make(map[int][]*model.ClientRecord)
  137. emailOrder := make([]string, 0, len(emails))
  138. emailRepr := make(map[string]string, len(emails))
  139. emailFailed := make(map[string]bool, len(emails))
  140. seenEmail := make(map[string]struct{}, len(emails))
  141. for _, email := range emails {
  142. if email == "" {
  143. continue
  144. }
  145. key := strings.ToLower(email)
  146. if _, ok := seenEmail[key]; ok {
  147. continue
  148. }
  149. seenEmail[key] = struct{}{}
  150. rec, err := s.GetRecordByEmail(nil, email)
  151. if err != nil {
  152. recordErr("%s: %v", email, err)
  153. continue
  154. }
  155. currentIds, err := s.GetInboundIdsForRecord(rec.Id)
  156. if err != nil {
  157. recordErr("%s: %v", email, err)
  158. continue
  159. }
  160. matched := false
  161. for _, id := range currentIds {
  162. if _, ok := requested[id]; ok {
  163. recsByInbound[id] = append(recsByInbound[id], rec)
  164. matched = true
  165. }
  166. }
  167. if !matched {
  168. result.Skipped = append(result.Skipped, rec.Email)
  169. continue
  170. }
  171. emailOrder = append(emailOrder, key)
  172. emailRepr[key] = rec.Email
  173. }
  174. needRestart := false
  175. for _, ibId := range inboundIds {
  176. recs, ok := recsByInbound[ibId]
  177. if !ok {
  178. continue
  179. }
  180. delete(recsByInbound, ibId)
  181. nr, err := s.delInboundClients(inboundSvc, ibId, recs, true)
  182. if err != nil {
  183. recordErr("inbound %d: %v", ibId, err)
  184. for _, rec := range recs {
  185. emailFailed[strings.ToLower(rec.Email)] = true
  186. }
  187. continue
  188. }
  189. if nr {
  190. needRestart = true
  191. }
  192. }
  193. for _, key := range emailOrder {
  194. if emailFailed[key] {
  195. continue
  196. }
  197. result.Detached = append(result.Detached, emailRepr[key])
  198. }
  199. return result, needRestart, nil
  200. }
  201. // BulkAdjustResult is returned by BulkAdjust to report how many clients were
  202. // successfully updated and which were skipped (typically because the field
  203. // being adjusted was unlimited for that client) or failed.
  204. type BulkAdjustResult struct {
  205. Adjusted int `json:"adjusted"`
  206. Skipped []BulkAdjustReport `json:"skipped,omitempty"`
  207. }
  208. type BulkAdjustReport struct {
  209. Email string `json:"email"`
  210. Reason string `json:"reason"`
  211. }
  212. type bulkAdjustEntry struct {
  213. record *model.ClientRecord
  214. applyExpiry bool
  215. newExpiry int64
  216. applyTotal bool
  217. newTotal int64
  218. }
  219. // bulkFlowClear is the directive that strips the XTLS flow from every selected
  220. // client. The vision values are the only positive flows xray accepts.
  221. const bulkFlowClear = "none"
  222. // bulkFlowAllowed whitelists the flow directives BulkAdjust accepts. Anything
  223. // outside this set is treated as "" (leave flow untouched) so a malformed or
  224. // hostile value can never be injected into a client's settings. The dropdown in
  225. // ClientBulkAdjustModal.tsx offers the same set ("" / "none" / TLS_FLOW_CONTROL);
  226. // keep the two in sync.
  227. var bulkFlowAllowed = map[string]struct{}{
  228. "": {},
  229. bulkFlowClear: {},
  230. "xtls-rprx-vision": {},
  231. "xtls-rprx-vision-udp443": {},
  232. }
  233. // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
  234. // for every email in the list. Clients whose corresponding field is
  235. // unlimited (0) are skipped — bulk extend should not accidentally
  236. // limit an unlimited client. addDays and addBytes may be negative.
  237. //
  238. // Like BulkDelete, the work is grouped by inbound so each inbound's
  239. // settings JSON is parsed and written exactly once regardless of how
  240. // many target emails it contains.
  241. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64, flow string) (BulkAdjustResult, bool, error) {
  242. result := BulkAdjustResult{}
  243. if len(emails) == 0 {
  244. return result, false, nil
  245. }
  246. flow = strings.TrimSpace(flow)
  247. if _, ok := bulkFlowAllowed[flow]; !ok {
  248. flow = "" // ignore unknown directives — "" means "leave flow untouched"
  249. }
  250. adjustFlow := flow != ""
  251. if addDays == 0 && addBytes == 0 && !adjustFlow {
  252. return result, false, common.NewError("no adjustment specified")
  253. }
  254. addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
  255. seen := map[string]struct{}{}
  256. cleanEmails := make([]string, 0, len(emails))
  257. for _, e := range emails {
  258. e = strings.TrimSpace(e)
  259. if e == "" {
  260. continue
  261. }
  262. if _, ok := seen[e]; ok {
  263. continue
  264. }
  265. seen[e] = struct{}{}
  266. cleanEmails = append(cleanEmails, e)
  267. }
  268. if len(cleanEmails) == 0 {
  269. return result, false, nil
  270. }
  271. db := database.GetDB()
  272. var records []model.ClientRecord
  273. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  274. var rows []model.ClientRecord
  275. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  276. return result, false, err
  277. }
  278. records = append(records, rows...)
  279. }
  280. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  281. for i := range records {
  282. recordsByEmail[records[i].Email] = &records[i]
  283. }
  284. skippedReasons := map[string]string{}
  285. for _, email := range cleanEmails {
  286. if _, ok := recordsByEmail[email]; !ok {
  287. skippedReasons[email] = "client not found"
  288. }
  289. }
  290. plan := map[string]*bulkAdjustEntry{}
  291. for email, rec := range recordsByEmail {
  292. entry := &bulkAdjustEntry{record: rec}
  293. if addDays != 0 {
  294. switch {
  295. case rec.ExpiryTime == 0:
  296. if _, exists := skippedReasons[email]; !exists {
  297. skippedReasons[email] = "unlimited expiry"
  298. }
  299. case rec.ExpiryTime > 0:
  300. next := rec.ExpiryTime + addExpiryMs
  301. if next <= 0 {
  302. if _, exists := skippedReasons[email]; !exists {
  303. skippedReasons[email] = "reduction exceeds remaining time"
  304. }
  305. } else {
  306. entry.applyExpiry = true
  307. entry.newExpiry = next
  308. }
  309. default:
  310. next := rec.ExpiryTime - addExpiryMs
  311. if next >= 0 {
  312. if _, exists := skippedReasons[email]; !exists {
  313. skippedReasons[email] = "reduction exceeds delay window"
  314. }
  315. } else {
  316. entry.applyExpiry = true
  317. entry.newExpiry = next
  318. }
  319. }
  320. }
  321. if addBytes != 0 {
  322. if rec.TotalGB == 0 {
  323. if _, exists := skippedReasons[email]; !exists {
  324. skippedReasons[email] = "unlimited traffic"
  325. }
  326. } else {
  327. next := max(rec.TotalGB+addBytes, 0)
  328. entry.applyTotal = true
  329. entry.newTotal = next
  330. }
  331. }
  332. if entry.applyExpiry || entry.applyTotal || adjustFlow {
  333. plan[email] = entry
  334. }
  335. }
  336. if len(plan) == 0 {
  337. for email, reason := range skippedReasons {
  338. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  339. }
  340. return result, false, nil
  341. }
  342. plannedIds := make([]int, 0, len(plan))
  343. recordIdToEmail := make(map[int]string, len(plan))
  344. for email, entry := range plan {
  345. plannedIds = append(plannedIds, entry.record.Id)
  346. recordIdToEmail[entry.record.Id] = email
  347. }
  348. var mappings []model.ClientInbound
  349. for _, batch := range chunkInts(plannedIds, sqlInChunk) {
  350. var rows []model.ClientInbound
  351. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  352. return result, false, err
  353. }
  354. mappings = append(mappings, rows...)
  355. }
  356. emailsByInbound := map[int][]string{}
  357. for _, m := range mappings {
  358. email, ok := recordIdToEmail[m.ClientId]
  359. if !ok {
  360. continue
  361. }
  362. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  363. }
  364. needRestart := false
  365. flowHonored := map[string]bool{}
  366. flowIneligible := map[string]bool{}
  367. for inboundId, ibEmails := range emailsByInbound {
  368. ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan, flow)
  369. if ibRes.needRestart {
  370. needRestart = true
  371. }
  372. for email := range ibRes.flowHonored {
  373. flowHonored[email] = true
  374. }
  375. for email := range ibRes.flowIneligible {
  376. flowIneligible[email] = true
  377. }
  378. for email, reason := range ibRes.perEmailSkipped {
  379. if _, already := skippedReasons[email]; !already {
  380. skippedReasons[email] = reason
  381. }
  382. }
  383. }
  384. adjusted := map[string]struct{}{}
  385. for email, entry := range plan {
  386. if _, skipped := skippedReasons[email]; skipped {
  387. continue
  388. }
  389. updates := map[string]any{}
  390. if entry.applyExpiry {
  391. updates["expiry_time"] = entry.newExpiry
  392. }
  393. if entry.applyTotal {
  394. updates["total"] = entry.newTotal
  395. }
  396. if len(updates) > 0 {
  397. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
  398. if _, already := skippedReasons[email]; !already {
  399. skippedReasons[email] = err.Error()
  400. }
  401. continue
  402. }
  403. }
  404. // Counted when expiry/total changed, or a flow directive was honored
  405. // for this client (flow lives in the inbound JSON, not ClientTraffic).
  406. if len(updates) > 0 || flowHonored[email] {
  407. adjusted[email] = struct{}{}
  408. }
  409. }
  410. result.Adjusted = len(adjusted)
  411. for email, reason := range skippedReasons {
  412. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  413. }
  414. // Report a flow directive that no inbound could carry — only when it was not
  415. // honored anywhere and the client has no other (expiry/total) skip reason.
  416. // The expiry/total part, if any, has already been applied and counted above.
  417. for email := range flowIneligible {
  418. if flowHonored[email] {
  419. continue
  420. }
  421. if _, already := skippedReasons[email]; already {
  422. continue
  423. }
  424. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "flow not supported on inbound"})
  425. }
  426. return result, needRestart, nil
  427. }
  428. type bulkInboundAdjustResult struct {
  429. perEmailSkipped map[string]string
  430. flowHonored map[string]bool
  431. // flowIneligible is tracked apart from perEmailSkipped: a flow directive
  432. // that an inbound cannot carry must not suppress the expiry/total write for
  433. // the same client (which would diverge the inbound JSON / ClientRecord from
  434. // ClientTraffic). It only feeds the final Skipped report.
  435. flowIneligible map[string]bool
  436. needRestart bool
  437. }
  438. // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
  439. // inside a single inbound's settings JSON. The xray runtime is updated
  440. // only for remote-node inbounds; local nodes do not need a notification
  441. // because the AddUser payload does not include totalGB/expiryTime —
  442. // changing those fields is identity-preserving and the panel's traffic
  443. // enforcement loop picks up the new limits from ClientTraffic directly.
  444. func (s *ClientService) bulkAdjustInboundClients(
  445. inboundSvc *InboundService,
  446. inboundId int,
  447. emails []string,
  448. plan map[string]*bulkAdjustEntry,
  449. flow string,
  450. ) bulkInboundAdjustResult {
  451. res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}, flowHonored: map[string]bool{}, flowIneligible: map[string]bool{}}
  452. defer lockInbound(inboundId).Unlock()
  453. oldInbound, err := inboundSvc.GetInbound(inboundId)
  454. if err != nil {
  455. logger.Error("Load Old Data Error")
  456. for _, e := range emails {
  457. res.perEmailSkipped[e] = err.Error()
  458. }
  459. return res
  460. }
  461. var settings map[string]any
  462. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  463. for _, e := range emails {
  464. res.perEmailSkipped[e] = err.Error()
  465. }
  466. return res
  467. }
  468. // Match by email — the client's stable identity (see Delete). Credentials
  469. // can drift from the inbound JSON, so they are never used for matching.
  470. wantedEmails := make(map[string]struct{}, len(emails))
  471. for _, email := range emails {
  472. if plan[email] == nil {
  473. res.perEmailSkipped[email] = "client not found"
  474. continue
  475. }
  476. wantedEmails[email] = struct{}{}
  477. }
  478. // Flow eligibility is a property of the inbound (protocol + transport), so
  479. // resolve it once. Clearing flow is always allowed; setting a vision flow
  480. // is only honored on an inbound that can carry it.
  481. flowEligible := flow == bulkFlowClear ||
  482. inboundCanEnableTlsFlow(string(oldInbound.Protocol), oldInbound.StreamSettings, oldInbound.Settings)
  483. interfaceClients, _ := settings["clients"].([]any)
  484. foundEmails := map[string]bool{}
  485. flowChanged := false
  486. nowMs := time.Now().Unix() * 1000
  487. for i, client := range interfaceClients {
  488. c, ok := client.(map[string]any)
  489. if !ok {
  490. continue
  491. }
  492. targetEmail, _ := c["email"].(string)
  493. if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" {
  494. continue
  495. }
  496. entry := plan[targetEmail]
  497. if entry.applyExpiry {
  498. c["expiryTime"] = entry.newExpiry
  499. }
  500. if entry.applyTotal {
  501. c["totalGB"] = entry.newTotal
  502. }
  503. if flow != "" {
  504. if flowEligible {
  505. want := ""
  506. if flow != bulkFlowClear {
  507. want = flow
  508. }
  509. if cur, _ := c["flow"].(string); cur != want {
  510. c["flow"] = want
  511. flowChanged = true
  512. }
  513. res.flowHonored[targetEmail] = true
  514. } else {
  515. // Record separately so this never suppresses the expiry/total
  516. // write for the same client (see flowIneligible doc).
  517. res.flowIneligible[targetEmail] = true
  518. }
  519. }
  520. c["updated_at"] = nowMs
  521. interfaceClients[i] = c
  522. foundEmails[targetEmail] = true
  523. }
  524. for email := range wantedEmails {
  525. if !foundEmails[email] {
  526. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  527. }
  528. }
  529. if len(foundEmails) == 0 {
  530. return res
  531. }
  532. settings["clients"] = interfaceClients
  533. newSettings, err := json.MarshalIndent(settings, "", " ")
  534. if err != nil {
  535. for email := range foundEmails {
  536. res.perEmailSkipped[email] = err.Error()
  537. }
  538. return res
  539. }
  540. oldInbound.Settings = string(newSettings)
  541. // A flow change rewrites the user's xray config, which the lightweight
  542. // UpdateUser push below does not carry. Local nodes reload via restart;
  543. // remote nodes get a full reconcile (MarkNodeDirty) instead of a per-user push.
  544. if flowChanged && oldInbound.NodeID == nil {
  545. res.needRestart = true
  546. }
  547. markDirty := false
  548. if oldInbound.NodeID != nil {
  549. rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
  550. if perr != nil {
  551. for email := range foundEmails {
  552. res.perEmailSkipped[email] = perr.Error()
  553. delete(foundEmails, email)
  554. }
  555. } else {
  556. if dirty {
  557. markDirty = true
  558. }
  559. if flowChanged {
  560. markDirty = true
  561. push = false
  562. }
  563. // Large batches collapse into one reconcile push rather than M updates.
  564. if push && len(foundEmails) > nodeBulkPushThreshold {
  565. markDirty = true
  566. push = false
  567. }
  568. if push {
  569. for email := range foundEmails {
  570. entry := plan[email]
  571. updated := *entry.record.ToClient()
  572. if entry.applyExpiry {
  573. updated.ExpiryTime = entry.newExpiry
  574. }
  575. if entry.applyTotal {
  576. updated.TotalGB = entry.newTotal
  577. }
  578. updated.UpdatedAt = nowMs
  579. if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
  580. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  581. markDirty = true
  582. }
  583. }
  584. }
  585. }
  586. }
  587. // Serialize against the traffic poll to avoid the cross-transaction
  588. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  589. txErr := runSerializedTx(func(tx *gorm.DB) error {
  590. if err := tx.Save(oldInbound).Error; err != nil {
  591. return err
  592. }
  593. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  594. if gcErr != nil {
  595. return gcErr
  596. }
  597. return s.SyncInbound(tx, inboundId, finalClients)
  598. })
  599. if txErr != nil {
  600. for email := range foundEmails {
  601. if _, skip := res.perEmailSkipped[email]; !skip {
  602. res.perEmailSkipped[email] = txErr.Error()
  603. }
  604. }
  605. } else if markDirty && oldInbound.NodeID != nil {
  606. if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
  607. logger.Warning("mark node dirty failed:", dErr)
  608. }
  609. }
  610. return res
  611. }
  612. // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
  613. // skip reasons when an email could not be processed.
  614. type BulkDeleteResult struct {
  615. Deleted int `json:"deleted"`
  616. Skipped []BulkDeleteReport `json:"skipped,omitempty"`
  617. }
  618. type BulkDeleteReport struct {
  619. Email string `json:"email"`
  620. Reason string `json:"reason"`
  621. }
  622. // BulkDelete removes every client in the list in one optimized pass.
  623. // Instead of running the full single-delete pipeline N times (which would
  624. // re-read, re-parse, and re-write each inbound's settings JSON for every
  625. // email), it groups emails by inbound and performs a single
  626. // read-modify-write per inbound. Per-row DB cleanups are also batched with
  627. // IN-clause queries at the end. Errors on a particular email are recorded
  628. // in the Skipped list and processing continues for the rest.
  629. func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
  630. result := BulkDeleteResult{}
  631. seen := map[string]struct{}{}
  632. cleanEmails := make([]string, 0, len(emails))
  633. for _, e := range emails {
  634. e = strings.TrimSpace(e)
  635. if e == "" {
  636. continue
  637. }
  638. if _, ok := seen[e]; ok {
  639. continue
  640. }
  641. seen[e] = struct{}{}
  642. cleanEmails = append(cleanEmails, e)
  643. }
  644. if len(cleanEmails) == 0 {
  645. return result, false, nil
  646. }
  647. db := database.GetDB()
  648. var records []model.ClientRecord
  649. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  650. var rows []model.ClientRecord
  651. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  652. return result, false, err
  653. }
  654. records = append(records, rows...)
  655. }
  656. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  657. tombstoneEmails := make([]string, 0, len(records))
  658. for i := range records {
  659. recordsByEmail[records[i].Email] = &records[i]
  660. tombstoneEmails = append(tombstoneEmails, records[i].Email)
  661. }
  662. tombstoneClientEmails(tombstoneEmails)
  663. skippedReasons := map[string]string{}
  664. for _, email := range cleanEmails {
  665. if _, ok := recordsByEmail[email]; !ok {
  666. skippedReasons[email] = "client not found"
  667. }
  668. }
  669. clientIds := make([]int, 0, len(recordsByEmail))
  670. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  671. for _, r := range recordsByEmail {
  672. clientIds = append(clientIds, r.Id)
  673. recordIdToEmail[r.Id] = r.Email
  674. }
  675. emailsByInbound := map[int][]string{}
  676. if len(clientIds) > 0 {
  677. var mappings []model.ClientInbound
  678. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  679. var rows []model.ClientInbound
  680. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  681. return result, false, err
  682. }
  683. mappings = append(mappings, rows...)
  684. }
  685. for _, m := range mappings {
  686. email, ok := recordIdToEmail[m.ClientId]
  687. if !ok {
  688. continue
  689. }
  690. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  691. }
  692. }
  693. needRestart := false
  694. for inboundId, ibEmails := range emailsByInbound {
  695. ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false)
  696. if ibResult.needRestart {
  697. needRestart = true
  698. }
  699. for email, reason := range ibResult.perEmailSkipped {
  700. if _, already := skippedReasons[email]; !already {
  701. skippedReasons[email] = reason
  702. }
  703. }
  704. }
  705. successEmails := make([]string, 0, len(recordsByEmail))
  706. successIds := make([]int, 0, len(recordsByEmail))
  707. for email, rec := range recordsByEmail {
  708. if _, skipped := skippedReasons[email]; skipped {
  709. continue
  710. }
  711. successEmails = append(successEmails, email)
  712. successIds = append(successIds, rec.Id)
  713. }
  714. if len(successIds) > 0 {
  715. // Serialize the row cleanup against the traffic poll to avoid the
  716. // cross-transaction lock-order deadlock on client_traffics/inbounds.
  717. if err := runSerializedTx(func(tx *gorm.DB) error {
  718. for _, batch := range chunkInts(successIds, sqlInChunk) {
  719. if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil {
  720. return e
  721. }
  722. }
  723. if !keepTraffic && len(successEmails) > 0 {
  724. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  725. if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil {
  726. return e
  727. }
  728. if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil {
  729. return e
  730. }
  731. }
  732. }
  733. for _, batch := range chunkInts(successIds, sqlInChunk) {
  734. if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil {
  735. return e
  736. }
  737. }
  738. return nil
  739. }); err != nil {
  740. return result, needRestart, err
  741. }
  742. }
  743. result.Deleted = len(successEmails)
  744. for email, reason := range skippedReasons {
  745. result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
  746. }
  747. return result, needRestart, nil
  748. }
  749. type bulkInboundDeleteResult struct {
  750. perEmailSkipped map[string]string
  751. needRestart bool
  752. }
  753. // bulkDelInboundClients removes multiple clients from a single inbound's
  754. // settings JSON in one read-modify-write cycle, runs the xray runtime
  755. // RemoveUser/DeleteUser calls, and persists the inbound. The returned map
  756. // holds per-email failure reasons; emails not present in the map are
  757. // considered successful for this inbound.
  758. func (s *ClientService) bulkDelInboundClients(
  759. inboundSvc *InboundService,
  760. inboundId int,
  761. emails []string,
  762. records map[string]*model.ClientRecord,
  763. keepTraffic bool,
  764. ) bulkInboundDeleteResult {
  765. res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
  766. defer lockInbound(inboundId).Unlock()
  767. oldInbound, err := inboundSvc.GetInbound(inboundId)
  768. if err != nil {
  769. logger.Error("Load Old Data Error")
  770. for _, e := range emails {
  771. res.perEmailSkipped[e] = err.Error()
  772. }
  773. return res
  774. }
  775. var settings map[string]any
  776. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  777. for _, e := range emails {
  778. res.perEmailSkipped[e] = err.Error()
  779. }
  780. return res
  781. }
  782. // Match by email — the client's stable identity (see Delete). Removes every
  783. // entry carrying a wanted email, independent of credential drift.
  784. wantedEmails := make(map[string]struct{}, len(emails))
  785. for _, email := range emails {
  786. if records[email] == nil {
  787. res.perEmailSkipped[email] = "client not found"
  788. continue
  789. }
  790. wantedEmails[email] = struct{}{}
  791. }
  792. interfaceClients, _ := settings["clients"].([]any)
  793. newClients := make([]any, 0, len(interfaceClients))
  794. foundEmails := map[string]bool{}
  795. enableByEmail := map[string]bool{}
  796. for _, client := range interfaceClients {
  797. c, ok := client.(map[string]any)
  798. if !ok {
  799. newClients = append(newClients, client)
  800. continue
  801. }
  802. em, _ := c["email"].(string)
  803. if _, found := wantedEmails[em]; found && em != "" {
  804. foundEmails[em] = true
  805. en, _ := c["enable"].(bool)
  806. enableByEmail[em] = en
  807. continue
  808. }
  809. newClients = append(newClients, client)
  810. }
  811. for email := range wantedEmails {
  812. if !foundEmails[email] {
  813. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  814. }
  815. }
  816. db := database.GetDB()
  817. newClients = compactOrphans(db, newClients)
  818. if newClients == nil {
  819. newClients = []any{}
  820. }
  821. settings["clients"] = newClients
  822. newSettings, err := json.MarshalIndent(settings, "", " ")
  823. if err != nil {
  824. for email := range foundEmails {
  825. if _, skip := res.perEmailSkipped[email]; !skip {
  826. res.perEmailSkipped[email] = err.Error()
  827. }
  828. }
  829. return res
  830. }
  831. oldInbound.Settings = string(newSettings)
  832. foundList := make([]string, 0, len(foundEmails))
  833. for email := range foundEmails {
  834. foundList = append(foundList, email)
  835. }
  836. notDepletedByEmail := map[string]bool{}
  837. if len(foundList) > 0 {
  838. type trafficRow struct {
  839. Email string
  840. Enable bool
  841. }
  842. for _, batch := range chunkStrings(foundList, sqlInChunk) {
  843. var rows []trafficRow
  844. if err := db.Model(xray.ClientTraffic{}).
  845. Where("email IN ?", batch).
  846. Select("email, enable").
  847. Scan(&rows).Error; err == nil {
  848. for _, r := range rows {
  849. notDepletedByEmail[r.Email] = r.Enable
  850. }
  851. }
  852. }
  853. }
  854. var sharedSet map[string]bool
  855. if !keepTraffic {
  856. var sharedErr error
  857. sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
  858. if sharedErr != nil {
  859. for email := range foundEmails {
  860. res.perEmailSkipped[email] = sharedErr.Error()
  861. delete(foundEmails, email)
  862. }
  863. return res
  864. }
  865. }
  866. if !keepTraffic {
  867. purge := make([]string, 0, len(foundEmails))
  868. for email := range foundEmails {
  869. if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
  870. purge = append(purge, email)
  871. }
  872. }
  873. if len(purge) > 0 {
  874. // Serialize the IP/stat purge against the traffic poll to avoid the
  875. // cross-transaction lock-order deadlock on client_traffics.
  876. if delErr := runSerializedTx(func(tx *gorm.DB) error {
  877. if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil {
  878. logger.Error("Error in delete client IPs")
  879. return e
  880. }
  881. if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil {
  882. logger.Error("Delete stats Data Error")
  883. return e
  884. }
  885. return nil
  886. }); delErr != nil {
  887. for _, email := range purge {
  888. res.perEmailSkipped[email] = delErr.Error()
  889. delete(foundEmails, email)
  890. }
  891. }
  892. }
  893. }
  894. markDirty := false
  895. if oldInbound.NodeID == nil {
  896. rt, rterr := inboundSvc.runtimeFor(oldInbound)
  897. if rterr != nil {
  898. res.needRestart = true
  899. } else {
  900. for email := range foundEmails {
  901. if !enableByEmail[email] || !notDepletedByEmail[email] {
  902. continue
  903. }
  904. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  905. if err1 == nil {
  906. logger.Debug("Client deleted on", rt.Name(), ":", email)
  907. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  908. logger.Debug("User is already deleted. Nothing to do more...")
  909. } else {
  910. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  911. res.needRestart = true
  912. }
  913. }
  914. }
  915. } else {
  916. rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
  917. if perr != nil {
  918. for email := range foundEmails {
  919. res.perEmailSkipped[email] = perr.Error()
  920. delete(foundEmails, email)
  921. }
  922. } else {
  923. if dirty {
  924. markDirty = true
  925. }
  926. // Large batches collapse into one reconcile push rather than M deletes.
  927. if push && len(foundEmails) > nodeBulkPushThreshold {
  928. markDirty = true
  929. push = false
  930. }
  931. if push {
  932. for email := range foundEmails {
  933. if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
  934. logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
  935. markDirty = true
  936. }
  937. }
  938. }
  939. }
  940. }
  941. // Serialize against the traffic poll to avoid the cross-transaction
  942. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  943. txErr := runSerializedTx(func(tx *gorm.DB) error {
  944. if err := tx.Save(oldInbound).Error; err != nil {
  945. return err
  946. }
  947. finalClients, err := inboundSvc.GetClients(oldInbound)
  948. if err != nil {
  949. return err
  950. }
  951. return s.SyncInbound(tx, inboundId, finalClients)
  952. })
  953. if txErr != nil {
  954. for email := range foundEmails {
  955. if _, skip := res.perEmailSkipped[email]; !skip {
  956. res.perEmailSkipped[email] = txErr.Error()
  957. }
  958. }
  959. } else if markDirty && oldInbound.NodeID != nil {
  960. if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
  961. logger.Warning("mark node dirty failed:", dErr)
  962. }
  963. }
  964. return res
  965. }
  966. // BulkCreateResult mirrors BulkAdjustResult for the create flow.
  967. type BulkCreateResult struct {
  968. Created int `json:"created"`
  969. Skipped []BulkCreateReport `json:"skipped,omitempty"`
  970. }
  971. type BulkCreateReport struct {
  972. Email string `json:"email"`
  973. Reason string `json:"reason"`
  974. }
  975. func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
  976. result := BulkCreateResult{}
  977. if len(payloads) == 0 {
  978. return result, false, nil
  979. }
  980. skip := func(email, reason string) {
  981. if strings.TrimSpace(email) == "" {
  982. email = "(missing email)"
  983. }
  984. result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
  985. }
  986. emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
  987. if err != nil {
  988. emailSubIDs = nil
  989. }
  990. type prepared struct {
  991. client model.Client
  992. inboundIds []int
  993. }
  994. prep := make([]prepared, 0, len(payloads))
  995. emails := make([]string, 0, len(payloads))
  996. subIDs := make([]string, 0, len(payloads))
  997. seenEmail := make(map[string]struct{}, len(payloads))
  998. seenSubID := make(map[string]string, len(payloads))
  999. for i := range payloads {
  1000. client := payloads[i].Client
  1001. email := strings.TrimSpace(client.Email)
  1002. if email == "" {
  1003. skip("", "client email is required")
  1004. continue
  1005. }
  1006. if verr := validateClientEmail(email); verr != nil {
  1007. skip(email, verr.Error())
  1008. continue
  1009. }
  1010. if verr := validateClientSubID(client.SubID); verr != nil {
  1011. skip(email, verr.Error())
  1012. continue
  1013. }
  1014. if len(payloads[i].InboundIds) == 0 {
  1015. skip(email, "at least one inbound is required")
  1016. continue
  1017. }
  1018. client.Email = email
  1019. if client.SubID == "" {
  1020. client.SubID = uuid.NewString()
  1021. }
  1022. if !client.Enable {
  1023. client.Enable = true
  1024. }
  1025. now := time.Now().UnixMilli()
  1026. if client.CreatedAt == 0 {
  1027. client.CreatedAt = now
  1028. }
  1029. client.UpdatedAt = now
  1030. le := strings.ToLower(email)
  1031. if _, dup := seenEmail[le]; dup {
  1032. skip(email, "email already in use: "+email)
  1033. continue
  1034. }
  1035. if owner, ok := seenSubID[client.SubID]; ok && owner != le {
  1036. skip(email, "subId already in use: "+client.SubID)
  1037. continue
  1038. }
  1039. seenEmail[le] = struct{}{}
  1040. seenSubID[client.SubID] = le
  1041. prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
  1042. emails = append(emails, email)
  1043. subIDs = append(subIDs, client.SubID)
  1044. }
  1045. if len(prep) == 0 {
  1046. return result, false, nil
  1047. }
  1048. db := database.GetDB()
  1049. const lookupChunk = 400
  1050. existingEmailSub := make(map[string]string, len(emails))
  1051. for start := 0; start < len(emails); start += lookupChunk {
  1052. end := min(start+lookupChunk, len(emails))
  1053. var rows []model.ClientRecord
  1054. if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
  1055. return result, false, e
  1056. }
  1057. for i := range rows {
  1058. existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
  1059. }
  1060. }
  1061. existingSubOwner := make(map[string]string, len(subIDs))
  1062. for start := 0; start < len(subIDs); start += lookupChunk {
  1063. end := min(start+lookupChunk, len(subIDs))
  1064. var rows []model.ClientRecord
  1065. if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
  1066. return result, false, e
  1067. }
  1068. for i := range rows {
  1069. existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
  1070. }
  1071. }
  1072. inboundCache := make(map[int]*model.Inbound)
  1073. getIb := func(id int) (*model.Inbound, error) {
  1074. if ib, ok := inboundCache[id]; ok {
  1075. return ib, nil
  1076. }
  1077. ib, e := inboundSvc.GetInbound(id)
  1078. if e != nil {
  1079. return nil, e
  1080. }
  1081. inboundCache[id] = ib
  1082. return ib, nil
  1083. }
  1084. byInbound := make(map[int][]model.Client)
  1085. idxByInbound := make(map[int][]int)
  1086. inboundOrder := make([]int, 0)
  1087. failed := make([]bool, len(prep))
  1088. reason := make([]string, len(prep))
  1089. for idx := range prep {
  1090. le := strings.ToLower(prep[idx].client.Email)
  1091. if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
  1092. failed[idx] = true
  1093. reason[idx] = "email already in use: " + prep[idx].client.Email
  1094. continue
  1095. }
  1096. if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
  1097. failed[idx] = true
  1098. reason[idx] = "subId already in use: " + prep[idx].client.SubID
  1099. continue
  1100. }
  1101. ok := true
  1102. for _, ibId := range prep[idx].inboundIds {
  1103. ib, e := getIb(ibId)
  1104. if e != nil {
  1105. failed[idx] = true
  1106. reason[idx] = e.Error()
  1107. ok = false
  1108. break
  1109. }
  1110. if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
  1111. failed[idx] = true
  1112. reason[idx] = e.Error()
  1113. ok = false
  1114. break
  1115. }
  1116. }
  1117. if !ok {
  1118. continue
  1119. }
  1120. for _, ibId := range prep[idx].inboundIds {
  1121. ib, _ := getIb(ibId)
  1122. if _, seen := byInbound[ibId]; !seen {
  1123. inboundOrder = append(inboundOrder, ibId)
  1124. }
  1125. byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
  1126. idxByInbound[ibId] = append(idxByInbound[ibId], idx)
  1127. }
  1128. }
  1129. needRestart := false
  1130. for _, ibId := range inboundOrder {
  1131. payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
  1132. if e == nil {
  1133. var nr bool
  1134. nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  1135. if e == nil && nr {
  1136. needRestart = true
  1137. }
  1138. }
  1139. if e != nil {
  1140. for _, idx := range idxByInbound[ibId] {
  1141. failed[idx] = true
  1142. if reason[idx] == "" {
  1143. reason[idx] = e.Error()
  1144. }
  1145. }
  1146. }
  1147. }
  1148. for idx := range prep {
  1149. if failed[idx] {
  1150. skip(prep[idx].client.Email, reason[idx])
  1151. } else {
  1152. result.Created++
  1153. }
  1154. }
  1155. return result, needRestart, nil
  1156. }
  1157. func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
  1158. db := database.GetDB()
  1159. now := time.Now().UnixMilli()
  1160. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  1161. var rows []xray.ClientTraffic
  1162. if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil {
  1163. return 0, false, err
  1164. }
  1165. if len(rows) == 0 {
  1166. return 0, false, nil
  1167. }
  1168. seen := make(map[string]struct{}, len(rows))
  1169. emails := make([]string, 0, len(rows))
  1170. for _, r := range rows {
  1171. if r.Email == "" {
  1172. continue
  1173. }
  1174. if _, ok := seen[r.Email]; ok {
  1175. continue
  1176. }
  1177. seen[r.Email] = struct{}{}
  1178. emails = append(emails, r.Email)
  1179. }
  1180. if len(emails) == 0 {
  1181. return 0, false, nil
  1182. }
  1183. res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
  1184. if err != nil {
  1185. return res.Deleted, needRestart, err
  1186. }
  1187. return res.Deleted, needRestart, nil
  1188. }