| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
- "time"
- "github.com/google/uuid"
- "github.com/mhsanaei/3x-ui/v3/internal/database"
- "github.com/mhsanaei/3x-ui/v3/internal/database/model"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- "github.com/mhsanaei/3x-ui/v3/internal/util/common"
- "github.com/mhsanaei/3x-ui/v3/internal/xray"
- "gorm.io/gorm"
- )
- // BulkAttachResult reports the outcome of a bulk attach across target inbounds.
- type BulkAttachResult struct {
- Attached []string `json:"attached"`
- Skipped []string `json:"skipped"`
- Errors []string `json:"errors"`
- }
- // BulkAttach attaches the given existing clients (by email) to each target inbound,
- // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds
- // all clients to a target in a single AddInboundClient call, and reports clients already
- // present on a target as skipped.
- func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) {
- result := &BulkAttachResult{}
- if len(emails) == 0 || len(inboundIds) == 0 {
- return result, false, nil
- }
- recordErr := func(format string, args ...any) {
- msg := fmt.Sprintf(format, args...)
- result.Errors = append(result.Errors, msg)
- logger.Warningf("[BulkAttach] %s", msg)
- }
- records := make([]*model.ClientRecord, 0, len(emails))
- seenEmail := make(map[string]struct{}, len(emails))
- for _, email := range emails {
- if email == "" {
- continue
- }
- key := strings.ToLower(email)
- if _, ok := seenEmail[key]; ok {
- continue
- }
- seenEmail[key] = struct{}{}
- rec, err := s.GetRecordByEmail(nil, email)
- if err != nil {
- recordErr("%s: %v", email, err)
- continue
- }
- records = append(records, rec)
- }
- emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs()
- if sidErr != nil {
- emailSubIDs = nil
- logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr)
- }
- needRestart := false
- for _, ibId := range inboundIds {
- inbound, err := inboundSvc.GetInbound(ibId)
- if err != nil {
- recordErr("inbound %d: %v", ibId, err)
- continue
- }
- existingClients, err := inboundSvc.GetClients(inbound)
- if err != nil {
- recordErr("inbound %d: %v", ibId, err)
- continue
- }
- have := make(map[string]struct{}, len(existingClients))
- for _, c := range existingClients {
- have[strings.ToLower(c.Email)] = struct{}{}
- }
- clientsToAdd := make([]model.Client, 0, len(records))
- for _, rec := range records {
- if _, attached := have[strings.ToLower(rec.Email)]; attached {
- result.Skipped = append(result.Skipped, rec.Email)
- continue
- }
- client := *rec.ToClient()
- client.UpdatedAt = time.Now().UnixMilli()
- if err := s.fillProtocolDefaults(&client, inbound); err != nil {
- recordErr("%s -> inbound %d: %v", rec.Email, ibId, err)
- continue
- }
- clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound))
- }
- if len(clientsToAdd) == 0 {
- continue
- }
- payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd})
- if err != nil {
- recordErr("inbound %d: %v", ibId, err)
- continue
- }
- nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
- if err != nil {
- recordErr("inbound %d: %v", ibId, err)
- continue
- }
- if nr {
- needRestart = true
- }
- for _, c := range clientsToAdd {
- result.Attached = append(result.Attached, c.Email)
- }
- }
- return result, needRestart, nil
- }
- // BulkDetachResult reports the outcome of a bulk detach across target inbounds.
- type BulkDetachResult struct {
- Detached []string `json:"detached"`
- Skipped []string `json:"skipped"`
- Errors []string `json:"errors"`
- }
- // BulkDetach detaches the given existing clients (by email) from each target inbound.
- // (email, inbound) pairs where the client is not currently attached are silently skipped
- // at the inbound level; emails that aren't attached to any of the requested inbounds
- // are reported under skipped. ClientRecord rows are kept even when they become orphaned
- // (matches single-client detach semantics); callers should use bulkDelete for full removal.
- func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) {
- result := &BulkDetachResult{}
- if len(emails) == 0 || len(inboundIds) == 0 {
- return result, false, nil
- }
- recordErr := func(format string, args ...any) {
- msg := fmt.Sprintf(format, args...)
- result.Errors = append(result.Errors, msg)
- logger.Warningf("[BulkDetach] %s", msg)
- }
- requested := make(map[int]struct{}, len(inboundIds))
- for _, id := range inboundIds {
- requested[id] = struct{}{}
- }
- recsByInbound := make(map[int][]*model.ClientRecord)
- emailOrder := make([]string, 0, len(emails))
- emailRepr := make(map[string]string, len(emails))
- emailFailed := make(map[string]bool, len(emails))
- seenEmail := make(map[string]struct{}, len(emails))
- for _, email := range emails {
- if email == "" {
- continue
- }
- key := strings.ToLower(email)
- if _, ok := seenEmail[key]; ok {
- continue
- }
- seenEmail[key] = struct{}{}
- rec, err := s.GetRecordByEmail(nil, email)
- if err != nil {
- recordErr("%s: %v", email, err)
- continue
- }
- currentIds, err := s.GetInboundIdsForRecord(rec.Id)
- if err != nil {
- recordErr("%s: %v", email, err)
- continue
- }
- matched := false
- for _, id := range currentIds {
- if _, ok := requested[id]; ok {
- recsByInbound[id] = append(recsByInbound[id], rec)
- matched = true
- }
- }
- if !matched {
- result.Skipped = append(result.Skipped, rec.Email)
- continue
- }
- emailOrder = append(emailOrder, key)
- emailRepr[key] = rec.Email
- }
- needRestart := false
- for _, ibId := range inboundIds {
- recs, ok := recsByInbound[ibId]
- if !ok {
- continue
- }
- delete(recsByInbound, ibId)
- nr, err := s.delInboundClients(inboundSvc, ibId, recs, true)
- if err != nil {
- recordErr("inbound %d: %v", ibId, err)
- for _, rec := range recs {
- emailFailed[strings.ToLower(rec.Email)] = true
- }
- continue
- }
- if nr {
- needRestart = true
- }
- }
- for _, key := range emailOrder {
- if emailFailed[key] {
- continue
- }
- result.Detached = append(result.Detached, emailRepr[key])
- }
- return result, needRestart, nil
- }
- // BulkAdjustResult is returned by BulkAdjust to report how many clients were
- // successfully updated and which were skipped (typically because the field
- // being adjusted was unlimited for that client) or failed.
- type BulkAdjustResult struct {
- Adjusted int `json:"adjusted"`
- Skipped []BulkAdjustReport `json:"skipped,omitempty"`
- }
- type BulkAdjustReport struct {
- Email string `json:"email"`
- Reason string `json:"reason"`
- }
- type bulkAdjustEntry struct {
- record *model.ClientRecord
- applyExpiry bool
- newExpiry int64
- applyTotal bool
- newTotal int64
- }
- // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
- // for every email in the list. Clients whose corresponding field is
- // unlimited (0) are skipped — bulk extend should not accidentally
- // limit an unlimited client. addDays and addBytes may be negative.
- //
- // Like BulkDelete, the work is grouped by inbound so each inbound's
- // settings JSON is parsed and written exactly once regardless of how
- // many target emails it contains.
- func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) {
- result := BulkAdjustResult{}
- if len(emails) == 0 {
- return result, false, nil
- }
- if addDays == 0 && addBytes == 0 {
- return result, false, common.NewError("no adjustment specified")
- }
- addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
- seen := map[string]struct{}{}
- cleanEmails := make([]string, 0, len(emails))
- for _, e := range emails {
- e = strings.TrimSpace(e)
- if e == "" {
- continue
- }
- if _, ok := seen[e]; ok {
- continue
- }
- seen[e] = struct{}{}
- cleanEmails = append(cleanEmails, e)
- }
- if len(cleanEmails) == 0 {
- return result, false, nil
- }
- db := database.GetDB()
- var records []model.ClientRecord
- for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
- var rows []model.ClientRecord
- if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
- return result, false, err
- }
- records = append(records, rows...)
- }
- recordsByEmail := make(map[string]*model.ClientRecord, len(records))
- for i := range records {
- recordsByEmail[records[i].Email] = &records[i]
- }
- skippedReasons := map[string]string{}
- for _, email := range cleanEmails {
- if _, ok := recordsByEmail[email]; !ok {
- skippedReasons[email] = "client not found"
- }
- }
- plan := map[string]*bulkAdjustEntry{}
- for email, rec := range recordsByEmail {
- entry := &bulkAdjustEntry{record: rec}
- if addDays != 0 {
- switch {
- case rec.ExpiryTime == 0:
- if _, exists := skippedReasons[email]; !exists {
- skippedReasons[email] = "unlimited expiry"
- }
- case rec.ExpiryTime > 0:
- next := rec.ExpiryTime + addExpiryMs
- if next <= 0 {
- if _, exists := skippedReasons[email]; !exists {
- skippedReasons[email] = "reduction exceeds remaining time"
- }
- } else {
- entry.applyExpiry = true
- entry.newExpiry = next
- }
- default:
- next := rec.ExpiryTime - addExpiryMs
- if next >= 0 {
- if _, exists := skippedReasons[email]; !exists {
- skippedReasons[email] = "reduction exceeds delay window"
- }
- } else {
- entry.applyExpiry = true
- entry.newExpiry = next
- }
- }
- }
- if addBytes != 0 {
- if rec.TotalGB == 0 {
- if _, exists := skippedReasons[email]; !exists {
- skippedReasons[email] = "unlimited traffic"
- }
- } else {
- next := max(rec.TotalGB+addBytes, 0)
- entry.applyTotal = true
- entry.newTotal = next
- }
- }
- if entry.applyExpiry || entry.applyTotal {
- plan[email] = entry
- }
- }
- if len(plan) == 0 {
- for email, reason := range skippedReasons {
- result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
- }
- return result, false, nil
- }
- plannedIds := make([]int, 0, len(plan))
- recordIdToEmail := make(map[int]string, len(plan))
- for email, entry := range plan {
- plannedIds = append(plannedIds, entry.record.Id)
- recordIdToEmail[entry.record.Id] = email
- }
- var mappings []model.ClientInbound
- for _, batch := range chunkInts(plannedIds, sqlInChunk) {
- var rows []model.ClientInbound
- if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
- return result, false, err
- }
- mappings = append(mappings, rows...)
- }
- emailsByInbound := map[int][]string{}
- for _, m := range mappings {
- email, ok := recordIdToEmail[m.ClientId]
- if !ok {
- continue
- }
- emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
- }
- needRestart := false
- for inboundId, ibEmails := range emailsByInbound {
- ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan)
- if ibRes.needRestart {
- needRestart = true
- }
- for email, reason := range ibRes.perEmailSkipped {
- if _, already := skippedReasons[email]; !already {
- skippedReasons[email] = reason
- }
- }
- }
- for email, entry := range plan {
- if _, skipped := skippedReasons[email]; skipped {
- continue
- }
- updates := map[string]any{}
- if entry.applyExpiry {
- updates["expiry_time"] = entry.newExpiry
- }
- if entry.applyTotal {
- updates["total"] = entry.newTotal
- }
- if len(updates) == 0 {
- continue
- }
- if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
- if _, already := skippedReasons[email]; !already {
- skippedReasons[email] = err.Error()
- }
- continue
- }
- result.Adjusted++
- }
- for email, reason := range skippedReasons {
- result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
- }
- return result, needRestart, nil
- }
- type bulkInboundAdjustResult struct {
- perEmailSkipped map[string]string
- needRestart bool
- }
- // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
- // inside a single inbound's settings JSON. The xray runtime is updated
- // only for remote-node inbounds; local nodes do not need a notification
- // because the AddUser payload does not include totalGB/expiryTime —
- // changing those fields is identity-preserving and the panel's traffic
- // enforcement loop picks up the new limits from ClientTraffic directly.
- func (s *ClientService) bulkAdjustInboundClients(
- inboundSvc *InboundService,
- inboundId int,
- emails []string,
- plan map[string]*bulkAdjustEntry,
- ) bulkInboundAdjustResult {
- res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}}
- defer lockInbound(inboundId).Unlock()
- oldInbound, err := inboundSvc.GetInbound(inboundId)
- if err != nil {
- logger.Error("Load Old Data Error")
- for _, e := range emails {
- res.perEmailSkipped[e] = err.Error()
- }
- return res
- }
- var settings map[string]any
- if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
- for _, e := range emails {
- res.perEmailSkipped[e] = err.Error()
- }
- return res
- }
- // Match by email — the client's stable identity (see Delete). Credentials
- // can drift from the inbound JSON, so they are never used for matching.
- wantedEmails := make(map[string]struct{}, len(emails))
- for _, email := range emails {
- if plan[email] == nil {
- res.perEmailSkipped[email] = "client not found"
- continue
- }
- wantedEmails[email] = struct{}{}
- }
- interfaceClients, _ := settings["clients"].([]any)
- foundEmails := map[string]bool{}
- nowMs := time.Now().Unix() * 1000
- for i, client := range interfaceClients {
- c, ok := client.(map[string]any)
- if !ok {
- continue
- }
- targetEmail, _ := c["email"].(string)
- if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" {
- continue
- }
- entry := plan[targetEmail]
- if entry.applyExpiry {
- c["expiryTime"] = entry.newExpiry
- }
- if entry.applyTotal {
- c["totalGB"] = entry.newTotal
- }
- c["updated_at"] = nowMs
- interfaceClients[i] = c
- foundEmails[targetEmail] = true
- }
- for email := range wantedEmails {
- if !foundEmails[email] {
- res.perEmailSkipped[email] = "Client Not Found In Inbound"
- }
- }
- if len(foundEmails) == 0 {
- return res
- }
- settings["clients"] = interfaceClients
- newSettings, err := json.MarshalIndent(settings, "", " ")
- if err != nil {
- for email := range foundEmails {
- res.perEmailSkipped[email] = err.Error()
- }
- return res
- }
- oldInbound.Settings = string(newSettings)
- markDirty := false
- if oldInbound.NodeID != nil {
- rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
- if perr != nil {
- for email := range foundEmails {
- res.perEmailSkipped[email] = perr.Error()
- delete(foundEmails, email)
- }
- } else {
- if dirty {
- markDirty = true
- }
- // Large batches collapse into one reconcile push rather than M updates.
- if push && len(foundEmails) > nodeBulkPushThreshold {
- markDirty = true
- push = false
- }
- if push {
- for email := range foundEmails {
- entry := plan[email]
- updated := *entry.record.ToClient()
- if entry.applyExpiry {
- updated.ExpiryTime = entry.newExpiry
- }
- if entry.applyTotal {
- updated.TotalGB = entry.newTotal
- }
- updated.UpdatedAt = nowMs
- if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
- logger.Warning("Error in updating client on", rt.Name(), ":", err1)
- markDirty = true
- }
- }
- }
- }
- }
- // Serialize against the traffic poll to avoid the cross-transaction
- // lock-order deadlock on inbounds/client_records (runSerializedTx).
- txErr := runSerializedTx(func(tx *gorm.DB) error {
- if err := tx.Save(oldInbound).Error; err != nil {
- return err
- }
- finalClients, gcErr := inboundSvc.GetClients(oldInbound)
- if gcErr != nil {
- return gcErr
- }
- return s.SyncInbound(tx, inboundId, finalClients)
- })
- if txErr != nil {
- for email := range foundEmails {
- if _, skip := res.perEmailSkipped[email]; !skip {
- res.perEmailSkipped[email] = txErr.Error()
- }
- }
- } else if markDirty && oldInbound.NodeID != nil {
- if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
- logger.Warning("mark node dirty failed:", dErr)
- }
- }
- return res
- }
- // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
- // skip reasons when an email could not be processed.
- type BulkDeleteResult struct {
- Deleted int `json:"deleted"`
- Skipped []BulkDeleteReport `json:"skipped,omitempty"`
- }
- type BulkDeleteReport struct {
- Email string `json:"email"`
- Reason string `json:"reason"`
- }
- // BulkDelete removes every client in the list in one optimized pass.
- // Instead of running the full single-delete pipeline N times (which would
- // re-read, re-parse, and re-write each inbound's settings JSON for every
- // email), it groups emails by inbound and performs a single
- // read-modify-write per inbound. Per-row DB cleanups are also batched with
- // IN-clause queries at the end. Errors on a particular email are recorded
- // in the Skipped list and processing continues for the rest.
- func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
- result := BulkDeleteResult{}
- seen := map[string]struct{}{}
- cleanEmails := make([]string, 0, len(emails))
- for _, e := range emails {
- e = strings.TrimSpace(e)
- if e == "" {
- continue
- }
- if _, ok := seen[e]; ok {
- continue
- }
- seen[e] = struct{}{}
- cleanEmails = append(cleanEmails, e)
- }
- if len(cleanEmails) == 0 {
- return result, false, nil
- }
- db := database.GetDB()
- var records []model.ClientRecord
- for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
- var rows []model.ClientRecord
- if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
- return result, false, err
- }
- records = append(records, rows...)
- }
- recordsByEmail := make(map[string]*model.ClientRecord, len(records))
- tombstoneEmails := make([]string, 0, len(records))
- for i := range records {
- recordsByEmail[records[i].Email] = &records[i]
- tombstoneEmails = append(tombstoneEmails, records[i].Email)
- }
- tombstoneClientEmails(tombstoneEmails)
- skippedReasons := map[string]string{}
- for _, email := range cleanEmails {
- if _, ok := recordsByEmail[email]; !ok {
- skippedReasons[email] = "client not found"
- }
- }
- clientIds := make([]int, 0, len(recordsByEmail))
- recordIdToEmail := make(map[int]string, len(recordsByEmail))
- for _, r := range recordsByEmail {
- clientIds = append(clientIds, r.Id)
- recordIdToEmail[r.Id] = r.Email
- }
- emailsByInbound := map[int][]string{}
- if len(clientIds) > 0 {
- var mappings []model.ClientInbound
- for _, batch := range chunkInts(clientIds, sqlInChunk) {
- var rows []model.ClientInbound
- if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
- return result, false, err
- }
- mappings = append(mappings, rows...)
- }
- for _, m := range mappings {
- email, ok := recordIdToEmail[m.ClientId]
- if !ok {
- continue
- }
- emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
- }
- }
- needRestart := false
- for inboundId, ibEmails := range emailsByInbound {
- ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false)
- if ibResult.needRestart {
- needRestart = true
- }
- for email, reason := range ibResult.perEmailSkipped {
- if _, already := skippedReasons[email]; !already {
- skippedReasons[email] = reason
- }
- }
- }
- successEmails := make([]string, 0, len(recordsByEmail))
- successIds := make([]int, 0, len(recordsByEmail))
- for email, rec := range recordsByEmail {
- if _, skipped := skippedReasons[email]; skipped {
- continue
- }
- successEmails = append(successEmails, email)
- successIds = append(successIds, rec.Id)
- }
- if len(successIds) > 0 {
- // Serialize the row cleanup against the traffic poll to avoid the
- // cross-transaction lock-order deadlock on client_traffics/inbounds.
- if err := runSerializedTx(func(tx *gorm.DB) error {
- for _, batch := range chunkInts(successIds, sqlInChunk) {
- if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil {
- return e
- }
- }
- if !keepTraffic && len(successEmails) > 0 {
- for _, batch := range chunkStrings(successEmails, sqlInChunk) {
- if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil {
- return e
- }
- if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil {
- return e
- }
- }
- }
- for _, batch := range chunkInts(successIds, sqlInChunk) {
- if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil {
- return e
- }
- }
- return nil
- }); err != nil {
- return result, needRestart, err
- }
- }
- result.Deleted = len(successEmails)
- for email, reason := range skippedReasons {
- result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
- }
- return result, needRestart, nil
- }
- type bulkInboundDeleteResult struct {
- perEmailSkipped map[string]string
- needRestart bool
- }
- // bulkDelInboundClients removes multiple clients from a single inbound's
- // settings JSON in one read-modify-write cycle, runs the xray runtime
- // RemoveUser/DeleteUser calls, and persists the inbound. The returned map
- // holds per-email failure reasons; emails not present in the map are
- // considered successful for this inbound.
- func (s *ClientService) bulkDelInboundClients(
- inboundSvc *InboundService,
- inboundId int,
- emails []string,
- records map[string]*model.ClientRecord,
- keepTraffic bool,
- ) bulkInboundDeleteResult {
- res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
- defer lockInbound(inboundId).Unlock()
- oldInbound, err := inboundSvc.GetInbound(inboundId)
- if err != nil {
- logger.Error("Load Old Data Error")
- for _, e := range emails {
- res.perEmailSkipped[e] = err.Error()
- }
- return res
- }
- var settings map[string]any
- if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
- for _, e := range emails {
- res.perEmailSkipped[e] = err.Error()
- }
- return res
- }
- // Match by email — the client's stable identity (see Delete). Removes every
- // entry carrying a wanted email, independent of credential drift.
- wantedEmails := make(map[string]struct{}, len(emails))
- for _, email := range emails {
- if records[email] == nil {
- res.perEmailSkipped[email] = "client not found"
- continue
- }
- wantedEmails[email] = struct{}{}
- }
- interfaceClients, _ := settings["clients"].([]any)
- newClients := make([]any, 0, len(interfaceClients))
- foundEmails := map[string]bool{}
- enableByEmail := map[string]bool{}
- for _, client := range interfaceClients {
- c, ok := client.(map[string]any)
- if !ok {
- newClients = append(newClients, client)
- continue
- }
- em, _ := c["email"].(string)
- if _, found := wantedEmails[em]; found && em != "" {
- foundEmails[em] = true
- en, _ := c["enable"].(bool)
- enableByEmail[em] = en
- continue
- }
- newClients = append(newClients, client)
- }
- for email := range wantedEmails {
- if !foundEmails[email] {
- res.perEmailSkipped[email] = "Client Not Found In Inbound"
- }
- }
- db := database.GetDB()
- newClients = compactOrphans(db, newClients)
- if newClients == nil {
- newClients = []any{}
- }
- settings["clients"] = newClients
- newSettings, err := json.MarshalIndent(settings, "", " ")
- if err != nil {
- for email := range foundEmails {
- if _, skip := res.perEmailSkipped[email]; !skip {
- res.perEmailSkipped[email] = err.Error()
- }
- }
- return res
- }
- oldInbound.Settings = string(newSettings)
- foundList := make([]string, 0, len(foundEmails))
- for email := range foundEmails {
- foundList = append(foundList, email)
- }
- notDepletedByEmail := map[string]bool{}
- if len(foundList) > 0 {
- type trafficRow struct {
- Email string
- Enable bool
- }
- for _, batch := range chunkStrings(foundList, sqlInChunk) {
- var rows []trafficRow
- if err := db.Model(xray.ClientTraffic{}).
- Where("email IN ?", batch).
- Select("email, enable").
- Scan(&rows).Error; err == nil {
- for _, r := range rows {
- notDepletedByEmail[r.Email] = r.Enable
- }
- }
- }
- }
- var sharedSet map[string]bool
- if !keepTraffic {
- var sharedErr error
- sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
- if sharedErr != nil {
- for email := range foundEmails {
- res.perEmailSkipped[email] = sharedErr.Error()
- delete(foundEmails, email)
- }
- return res
- }
- }
- if !keepTraffic {
- purge := make([]string, 0, len(foundEmails))
- for email := range foundEmails {
- if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
- purge = append(purge, email)
- }
- }
- if len(purge) > 0 {
- // Serialize the IP/stat purge against the traffic poll to avoid the
- // cross-transaction lock-order deadlock on client_traffics.
- if delErr := runSerializedTx(func(tx *gorm.DB) error {
- if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil {
- logger.Error("Error in delete client IPs")
- return e
- }
- if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil {
- logger.Error("Delete stats Data Error")
- return e
- }
- return nil
- }); delErr != nil {
- for _, email := range purge {
- res.perEmailSkipped[email] = delErr.Error()
- delete(foundEmails, email)
- }
- }
- }
- }
- markDirty := false
- if oldInbound.NodeID == nil {
- rt, rterr := inboundSvc.runtimeFor(oldInbound)
- if rterr != nil {
- res.needRestart = true
- } else {
- for email := range foundEmails {
- if !enableByEmail[email] || !notDepletedByEmail[email] {
- continue
- }
- err1 := rt.RemoveUser(context.Background(), oldInbound, email)
- if err1 == nil {
- logger.Debug("Client deleted on", rt.Name(), ":", email)
- } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
- logger.Debug("User is already deleted. Nothing to do more...")
- } else {
- logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
- res.needRestart = true
- }
- }
- }
- } else {
- rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
- if perr != nil {
- for email := range foundEmails {
- res.perEmailSkipped[email] = perr.Error()
- delete(foundEmails, email)
- }
- } else {
- if dirty {
- markDirty = true
- }
- // Large batches collapse into one reconcile push rather than M deletes.
- if push && len(foundEmails) > nodeBulkPushThreshold {
- markDirty = true
- push = false
- }
- if push {
- for email := range foundEmails {
- if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
- logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
- markDirty = true
- }
- }
- }
- }
- }
- // Serialize against the traffic poll to avoid the cross-transaction
- // lock-order deadlock on inbounds/client_records (runSerializedTx).
- txErr := runSerializedTx(func(tx *gorm.DB) error {
- if err := tx.Save(oldInbound).Error; err != nil {
- return err
- }
- finalClients, err := inboundSvc.GetClients(oldInbound)
- if err != nil {
- return err
- }
- return s.SyncInbound(tx, inboundId, finalClients)
- })
- if txErr != nil {
- for email := range foundEmails {
- if _, skip := res.perEmailSkipped[email]; !skip {
- res.perEmailSkipped[email] = txErr.Error()
- }
- }
- } else if markDirty && oldInbound.NodeID != nil {
- if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
- logger.Warning("mark node dirty failed:", dErr)
- }
- }
- return res
- }
- // BulkCreateResult mirrors BulkAdjustResult for the create flow.
- type BulkCreateResult struct {
- Created int `json:"created"`
- Skipped []BulkCreateReport `json:"skipped,omitempty"`
- }
- type BulkCreateReport struct {
- Email string `json:"email"`
- Reason string `json:"reason"`
- }
- func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
- result := BulkCreateResult{}
- if len(payloads) == 0 {
- return result, false, nil
- }
- skip := func(email, reason string) {
- if strings.TrimSpace(email) == "" {
- email = "(missing email)"
- }
- result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
- }
- emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
- if err != nil {
- emailSubIDs = nil
- }
- type prepared struct {
- client model.Client
- inboundIds []int
- }
- prep := make([]prepared, 0, len(payloads))
- emails := make([]string, 0, len(payloads))
- subIDs := make([]string, 0, len(payloads))
- seenEmail := make(map[string]struct{}, len(payloads))
- seenSubID := make(map[string]string, len(payloads))
- for i := range payloads {
- client := payloads[i].Client
- email := strings.TrimSpace(client.Email)
- if email == "" {
- skip("", "client email is required")
- continue
- }
- if verr := validateClientEmail(email); verr != nil {
- skip(email, verr.Error())
- continue
- }
- if verr := validateClientSubID(client.SubID); verr != nil {
- skip(email, verr.Error())
- continue
- }
- if len(payloads[i].InboundIds) == 0 {
- skip(email, "at least one inbound is required")
- continue
- }
- client.Email = email
- if client.SubID == "" {
- client.SubID = uuid.NewString()
- }
- if !client.Enable {
- client.Enable = true
- }
- now := time.Now().UnixMilli()
- if client.CreatedAt == 0 {
- client.CreatedAt = now
- }
- client.UpdatedAt = now
- le := strings.ToLower(email)
- if _, dup := seenEmail[le]; dup {
- skip(email, "email already in use: "+email)
- continue
- }
- if owner, ok := seenSubID[client.SubID]; ok && owner != le {
- skip(email, "subId already in use: "+client.SubID)
- continue
- }
- seenEmail[le] = struct{}{}
- seenSubID[client.SubID] = le
- prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
- emails = append(emails, email)
- subIDs = append(subIDs, client.SubID)
- }
- if len(prep) == 0 {
- return result, false, nil
- }
- db := database.GetDB()
- const lookupChunk = 400
- existingEmailSub := make(map[string]string, len(emails))
- for start := 0; start < len(emails); start += lookupChunk {
- end := min(start+lookupChunk, len(emails))
- var rows []model.ClientRecord
- if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
- return result, false, e
- }
- for i := range rows {
- existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
- }
- }
- existingSubOwner := make(map[string]string, len(subIDs))
- for start := 0; start < len(subIDs); start += lookupChunk {
- end := min(start+lookupChunk, len(subIDs))
- var rows []model.ClientRecord
- if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
- return result, false, e
- }
- for i := range rows {
- existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
- }
- }
- inboundCache := make(map[int]*model.Inbound)
- getIb := func(id int) (*model.Inbound, error) {
- if ib, ok := inboundCache[id]; ok {
- return ib, nil
- }
- ib, e := inboundSvc.GetInbound(id)
- if e != nil {
- return nil, e
- }
- inboundCache[id] = ib
- return ib, nil
- }
- byInbound := make(map[int][]model.Client)
- idxByInbound := make(map[int][]int)
- inboundOrder := make([]int, 0)
- failed := make([]bool, len(prep))
- reason := make([]string, len(prep))
- for idx := range prep {
- le := strings.ToLower(prep[idx].client.Email)
- if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
- failed[idx] = true
- reason[idx] = "email already in use: " + prep[idx].client.Email
- continue
- }
- if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
- failed[idx] = true
- reason[idx] = "subId already in use: " + prep[idx].client.SubID
- continue
- }
- ok := true
- for _, ibId := range prep[idx].inboundIds {
- ib, e := getIb(ibId)
- if e != nil {
- failed[idx] = true
- reason[idx] = e.Error()
- ok = false
- break
- }
- if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
- failed[idx] = true
- reason[idx] = e.Error()
- ok = false
- break
- }
- }
- if !ok {
- continue
- }
- for _, ibId := range prep[idx].inboundIds {
- ib, _ := getIb(ibId)
- if _, seen := byInbound[ibId]; !seen {
- inboundOrder = append(inboundOrder, ibId)
- }
- byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
- idxByInbound[ibId] = append(idxByInbound[ibId], idx)
- }
- }
- needRestart := false
- for _, ibId := range inboundOrder {
- payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
- if e == nil {
- var nr bool
- nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
- if e == nil && nr {
- needRestart = true
- }
- }
- if e != nil {
- for _, idx := range idxByInbound[ibId] {
- failed[idx] = true
- if reason[idx] == "" {
- reason[idx] = e.Error()
- }
- }
- }
- }
- for idx := range prep {
- if failed[idx] {
- skip(prep[idx].client.Email, reason[idx])
- } else {
- result.Created++
- }
- }
- return result, needRestart, nil
- }
- func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
- db := database.GetDB()
- now := time.Now().UnixMilli()
- depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
- var rows []xray.ClientTraffic
- if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil {
- return 0, false, err
- }
- if len(rows) == 0 {
- return 0, false, nil
- }
- seen := make(map[string]struct{}, len(rows))
- emails := make([]string, 0, len(rows))
- for _, r := range rows {
- if r.Email == "" {
- continue
- }
- if _, ok := seen[r.Email]; ok {
- continue
- }
- seen[r.Email] = struct{}{}
- emails = append(emails, r.Email)
- }
- if len(emails) == 0 {
- return 0, false, nil
- }
- res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
- if err != nil {
- return res.Deleted, needRestart, err
- }
- return res.Deleted, needRestart, nil
- }
|