package service import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/google/uuid" "github.com/mhsanaei/3x-ui/v3/internal/database" "github.com/mhsanaei/3x-ui/v3/internal/database/model" "github.com/mhsanaei/3x-ui/v3/internal/logger" "github.com/mhsanaei/3x-ui/v3/internal/util/common" "github.com/mhsanaei/3x-ui/v3/internal/xray" "gorm.io/gorm" ) // BulkAttachResult reports the outcome of a bulk attach across target inbounds. type BulkAttachResult struct { Attached []string `json:"attached"` Skipped []string `json:"skipped"` Errors []string `json:"errors"` } // BulkAttach attaches the given existing clients (by email) to each target inbound, // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds // all clients to a target in a single AddInboundClient call, and reports clients already // present on a target as skipped. func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) { result := &BulkAttachResult{} if len(emails) == 0 || len(inboundIds) == 0 { return result, false, nil } recordErr := func(format string, args ...any) { msg := fmt.Sprintf(format, args...) result.Errors = append(result.Errors, msg) logger.Warningf("[BulkAttach] %s", msg) } records := make([]*model.ClientRecord, 0, len(emails)) seenEmail := make(map[string]struct{}, len(emails)) for _, email := range emails { if email == "" { continue } key := strings.ToLower(email) if _, ok := seenEmail[key]; ok { continue } seenEmail[key] = struct{}{} rec, err := s.GetRecordByEmail(nil, email) if err != nil { recordErr("%s: %v", email, err) continue } records = append(records, rec) } emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs() if sidErr != nil { emailSubIDs = nil logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr) } needRestart := false for _, ibId := range inboundIds { inbound, err := inboundSvc.GetInbound(ibId) if err != nil { recordErr("inbound %d: %v", ibId, err) continue } existingClients, err := inboundSvc.GetClients(inbound) if err != nil { recordErr("inbound %d: %v", ibId, err) continue } have := make(map[string]struct{}, len(existingClients)) for _, c := range existingClients { have[strings.ToLower(c.Email)] = struct{}{} } clientsToAdd := make([]model.Client, 0, len(records)) for _, rec := range records { if _, attached := have[strings.ToLower(rec.Email)]; attached { result.Skipped = append(result.Skipped, rec.Email) continue } client := *rec.ToClient() client.UpdatedAt = time.Now().UnixMilli() if err := s.fillProtocolDefaults(&client, inbound); err != nil { recordErr("%s -> inbound %d: %v", rec.Email, ibId, err) continue } clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound)) } if len(clientsToAdd) == 0 { continue } payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd}) if err != nil { recordErr("inbound %d: %v", ibId, err) continue } nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs) if err != nil { recordErr("inbound %d: %v", ibId, err) continue } if nr { needRestart = true } for _, c := range clientsToAdd { result.Attached = append(result.Attached, c.Email) } } return result, needRestart, nil } // BulkDetachResult reports the outcome of a bulk detach across target inbounds. type BulkDetachResult struct { Detached []string `json:"detached"` Skipped []string `json:"skipped"` Errors []string `json:"errors"` } // BulkDetach detaches the given existing clients (by email) from each target inbound. // (email, inbound) pairs where the client is not currently attached are silently skipped // at the inbound level; emails that aren't attached to any of the requested inbounds // are reported under skipped. ClientRecord rows are kept even when they become orphaned // (matches single-client detach semantics); callers should use bulkDelete for full removal. func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) { result := &BulkDetachResult{} if len(emails) == 0 || len(inboundIds) == 0 { return result, false, nil } recordErr := func(format string, args ...any) { msg := fmt.Sprintf(format, args...) result.Errors = append(result.Errors, msg) logger.Warningf("[BulkDetach] %s", msg) } requested := make(map[int]struct{}, len(inboundIds)) for _, id := range inboundIds { requested[id] = struct{}{} } recsByInbound := make(map[int][]*model.ClientRecord) emailOrder := make([]string, 0, len(emails)) emailRepr := make(map[string]string, len(emails)) emailFailed := make(map[string]bool, len(emails)) seenEmail := make(map[string]struct{}, len(emails)) for _, email := range emails { if email == "" { continue } key := strings.ToLower(email) if _, ok := seenEmail[key]; ok { continue } seenEmail[key] = struct{}{} rec, err := s.GetRecordByEmail(nil, email) if err != nil { recordErr("%s: %v", email, err) continue } currentIds, err := s.GetInboundIdsForRecord(rec.Id) if err != nil { recordErr("%s: %v", email, err) continue } matched := false for _, id := range currentIds { if _, ok := requested[id]; ok { recsByInbound[id] = append(recsByInbound[id], rec) matched = true } } if !matched { result.Skipped = append(result.Skipped, rec.Email) continue } emailOrder = append(emailOrder, key) emailRepr[key] = rec.Email } needRestart := false for _, ibId := range inboundIds { recs, ok := recsByInbound[ibId] if !ok { continue } delete(recsByInbound, ibId) nr, err := s.delInboundClients(inboundSvc, ibId, recs, true) if err != nil { recordErr("inbound %d: %v", ibId, err) for _, rec := range recs { emailFailed[strings.ToLower(rec.Email)] = true } continue } if nr { needRestart = true } } for _, key := range emailOrder { if emailFailed[key] { continue } result.Detached = append(result.Detached, emailRepr[key]) } return result, needRestart, nil } // BulkAdjustResult is returned by BulkAdjust to report how many clients were // successfully updated and which were skipped (typically because the field // being adjusted was unlimited for that client) or failed. type BulkAdjustResult struct { Adjusted int `json:"adjusted"` Skipped []BulkAdjustReport `json:"skipped,omitempty"` } type BulkAdjustReport struct { Email string `json:"email"` Reason string `json:"reason"` } type bulkAdjustEntry struct { record *model.ClientRecord applyExpiry bool newExpiry int64 applyTotal bool newTotal int64 } // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes // for every email in the list. Clients whose corresponding field is // unlimited (0) are skipped — bulk extend should not accidentally // limit an unlimited client. addDays and addBytes may be negative. // // Like BulkDelete, the work is grouped by inbound so each inbound's // settings JSON is parsed and written exactly once regardless of how // many target emails it contains. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) { result := BulkAdjustResult{} if len(emails) == 0 { return result, false, nil } if addDays == 0 && addBytes == 0 { return result, false, common.NewError("no adjustment specified") } addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000 seen := map[string]struct{}{} cleanEmails := make([]string, 0, len(emails)) for _, e := range emails { e = strings.TrimSpace(e) if e == "" { continue } if _, ok := seen[e]; ok { continue } seen[e] = struct{}{} cleanEmails = append(cleanEmails, e) } if len(cleanEmails) == 0 { return result, false, nil } db := database.GetDB() var records []model.ClientRecord for _, batch := range chunkStrings(cleanEmails, sqlInChunk) { var rows []model.ClientRecord if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil { return result, false, err } records = append(records, rows...) } recordsByEmail := make(map[string]*model.ClientRecord, len(records)) for i := range records { recordsByEmail[records[i].Email] = &records[i] } skippedReasons := map[string]string{} for _, email := range cleanEmails { if _, ok := recordsByEmail[email]; !ok { skippedReasons[email] = "client not found" } } plan := map[string]*bulkAdjustEntry{} for email, rec := range recordsByEmail { entry := &bulkAdjustEntry{record: rec} if addDays != 0 { switch { case rec.ExpiryTime == 0: if _, exists := skippedReasons[email]; !exists { skippedReasons[email] = "unlimited expiry" } case rec.ExpiryTime > 0: next := rec.ExpiryTime + addExpiryMs if next <= 0 { if _, exists := skippedReasons[email]; !exists { skippedReasons[email] = "reduction exceeds remaining time" } } else { entry.applyExpiry = true entry.newExpiry = next } default: next := rec.ExpiryTime - addExpiryMs if next >= 0 { if _, exists := skippedReasons[email]; !exists { skippedReasons[email] = "reduction exceeds delay window" } } else { entry.applyExpiry = true entry.newExpiry = next } } } if addBytes != 0 { if rec.TotalGB == 0 { if _, exists := skippedReasons[email]; !exists { skippedReasons[email] = "unlimited traffic" } } else { next := max(rec.TotalGB+addBytes, 0) entry.applyTotal = true entry.newTotal = next } } if entry.applyExpiry || entry.applyTotal { plan[email] = entry } } if len(plan) == 0 { for email, reason := range skippedReasons { result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason}) } return result, false, nil } plannedIds := make([]int, 0, len(plan)) recordIdToEmail := make(map[int]string, len(plan)) for email, entry := range plan { plannedIds = append(plannedIds, entry.record.Id) recordIdToEmail[entry.record.Id] = email } var mappings []model.ClientInbound for _, batch := range chunkInts(plannedIds, sqlInChunk) { var rows []model.ClientInbound if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil { return result, false, err } mappings = append(mappings, rows...) } emailsByInbound := map[int][]string{} for _, m := range mappings { email, ok := recordIdToEmail[m.ClientId] if !ok { continue } emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email) } needRestart := false for inboundId, ibEmails := range emailsByInbound { ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan) if ibRes.needRestart { needRestart = true } for email, reason := range ibRes.perEmailSkipped { if _, already := skippedReasons[email]; !already { skippedReasons[email] = reason } } } for email, entry := range plan { if _, skipped := skippedReasons[email]; skipped { continue } updates := map[string]any{} if entry.applyExpiry { updates["expiry_time"] = entry.newExpiry } if entry.applyTotal { updates["total"] = entry.newTotal } if len(updates) == 0 { continue } if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil { if _, already := skippedReasons[email]; !already { skippedReasons[email] = err.Error() } continue } result.Adjusted++ } for email, reason := range skippedReasons { result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason}) } return result, needRestart, nil } type bulkInboundAdjustResult struct { perEmailSkipped map[string]string needRestart bool } // bulkAdjustInboundClients applies expiry/total deltas to multiple clients // inside a single inbound's settings JSON. The xray runtime is updated // only for remote-node inbounds; local nodes do not need a notification // because the AddUser payload does not include totalGB/expiryTime — // changing those fields is identity-preserving and the panel's traffic // enforcement loop picks up the new limits from ClientTraffic directly. func (s *ClientService) bulkAdjustInboundClients( inboundSvc *InboundService, inboundId int, emails []string, plan map[string]*bulkAdjustEntry, ) bulkInboundAdjustResult { res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}} defer lockInbound(inboundId).Unlock() oldInbound, err := inboundSvc.GetInbound(inboundId) if err != nil { logger.Error("Load Old Data Error") for _, e := range emails { res.perEmailSkipped[e] = err.Error() } return res } var settings map[string]any if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil { for _, e := range emails { res.perEmailSkipped[e] = err.Error() } return res } // Match by email — the client's stable identity (see Delete). Credentials // can drift from the inbound JSON, so they are never used for matching. wantedEmails := make(map[string]struct{}, len(emails)) for _, email := range emails { if plan[email] == nil { res.perEmailSkipped[email] = "client not found" continue } wantedEmails[email] = struct{}{} } interfaceClients, _ := settings["clients"].([]any) foundEmails := map[string]bool{} nowMs := time.Now().Unix() * 1000 for i, client := range interfaceClients { c, ok := client.(map[string]any) if !ok { continue } targetEmail, _ := c["email"].(string) if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" { continue } entry := plan[targetEmail] if entry.applyExpiry { c["expiryTime"] = entry.newExpiry } if entry.applyTotal { c["totalGB"] = entry.newTotal } c["updated_at"] = nowMs interfaceClients[i] = c foundEmails[targetEmail] = true } for email := range wantedEmails { if !foundEmails[email] { res.perEmailSkipped[email] = "Client Not Found In Inbound" } } if len(foundEmails) == 0 { return res } settings["clients"] = interfaceClients newSettings, err := json.MarshalIndent(settings, "", " ") if err != nil { for email := range foundEmails { res.perEmailSkipped[email] = err.Error() } return res } oldInbound.Settings = string(newSettings) markDirty := false if oldInbound.NodeID != nil { rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) if perr != nil { for email := range foundEmails { res.perEmailSkipped[email] = perr.Error() delete(foundEmails, email) } } else { if dirty { markDirty = true } if push { for email := range foundEmails { entry := plan[email] updated := *entry.record.ToClient() if entry.applyExpiry { updated.ExpiryTime = entry.newExpiry } if entry.applyTotal { updated.TotalGB = entry.newTotal } updated.UpdatedAt = nowMs if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil { logger.Warning("Error in updating client on", rt.Name(), ":", err1) markDirty = true } } } } } db := database.GetDB() txErr := db.Transaction(func(tx *gorm.DB) error { if err := tx.Save(oldInbound).Error; err != nil { return err } finalClients, gcErr := inboundSvc.GetClients(oldInbound) if gcErr != nil { return gcErr } return s.SyncInbound(tx, inboundId, finalClients) }) if txErr != nil { for email := range foundEmails { if _, skip := res.perEmailSkipped[email]; !skip { res.perEmailSkipped[email] = txErr.Error() } } } else if markDirty && oldInbound.NodeID != nil { if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { logger.Warning("mark node dirty failed:", dErr) } } return res } // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email // skip reasons when an email could not be processed. type BulkDeleteResult struct { Deleted int `json:"deleted"` Skipped []BulkDeleteReport `json:"skipped,omitempty"` } type BulkDeleteReport struct { Email string `json:"email"` Reason string `json:"reason"` } // BulkDelete removes every client in the list in one optimized pass. // Instead of running the full single-delete pipeline N times (which would // re-read, re-parse, and re-write each inbound's settings JSON for every // email), it groups emails by inbound and performs a single // read-modify-write per inbound. Per-row DB cleanups are also batched with // IN-clause queries at the end. Errors on a particular email are recorded // in the Skipped list and processing continues for the rest. func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) { result := BulkDeleteResult{} seen := map[string]struct{}{} cleanEmails := make([]string, 0, len(emails)) for _, e := range emails { e = strings.TrimSpace(e) if e == "" { continue } if _, ok := seen[e]; ok { continue } seen[e] = struct{}{} cleanEmails = append(cleanEmails, e) } if len(cleanEmails) == 0 { return result, false, nil } db := database.GetDB() var records []model.ClientRecord for _, batch := range chunkStrings(cleanEmails, sqlInChunk) { var rows []model.ClientRecord if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil { return result, false, err } records = append(records, rows...) } recordsByEmail := make(map[string]*model.ClientRecord, len(records)) tombstoneEmails := make([]string, 0, len(records)) for i := range records { recordsByEmail[records[i].Email] = &records[i] tombstoneEmails = append(tombstoneEmails, records[i].Email) } tombstoneClientEmails(tombstoneEmails) skippedReasons := map[string]string{} for _, email := range cleanEmails { if _, ok := recordsByEmail[email]; !ok { skippedReasons[email] = "client not found" } } clientIds := make([]int, 0, len(recordsByEmail)) recordIdToEmail := make(map[int]string, len(recordsByEmail)) for _, r := range recordsByEmail { clientIds = append(clientIds, r.Id) recordIdToEmail[r.Id] = r.Email } emailsByInbound := map[int][]string{} if len(clientIds) > 0 { var mappings []model.ClientInbound for _, batch := range chunkInts(clientIds, sqlInChunk) { var rows []model.ClientInbound if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil { return result, false, err } mappings = append(mappings, rows...) } for _, m := range mappings { email, ok := recordIdToEmail[m.ClientId] if !ok { continue } emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email) } } needRestart := false for inboundId, ibEmails := range emailsByInbound { ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false) if ibResult.needRestart { needRestart = true } for email, reason := range ibResult.perEmailSkipped { if _, already := skippedReasons[email]; !already { skippedReasons[email] = reason } } } successEmails := make([]string, 0, len(recordsByEmail)) successIds := make([]int, 0, len(recordsByEmail)) for email, rec := range recordsByEmail { if _, skipped := skippedReasons[email]; skipped { continue } successEmails = append(successEmails, email) successIds = append(successIds, rec.Id) } if len(successIds) > 0 { for _, batch := range chunkInts(successIds, sqlInChunk) { if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil { return result, needRestart, err } } if !keepTraffic && len(successEmails) > 0 { for _, batch := range chunkStrings(successEmails, sqlInChunk) { if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil { return result, needRestart, err } if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil { return result, needRestart, err } } } for _, batch := range chunkInts(successIds, sqlInChunk) { if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil { return result, needRestart, err } } } result.Deleted = len(successEmails) for email, reason := range skippedReasons { result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason}) } return result, needRestart, nil } type bulkInboundDeleteResult struct { perEmailSkipped map[string]string needRestart bool } // bulkDelInboundClients removes multiple clients from a single inbound's // settings JSON in one read-modify-write cycle, runs the xray runtime // RemoveUser/DeleteUser calls, and persists the inbound. The returned map // holds per-email failure reasons; emails not present in the map are // considered successful for this inbound. func (s *ClientService) bulkDelInboundClients( inboundSvc *InboundService, inboundId int, emails []string, records map[string]*model.ClientRecord, keepTraffic bool, ) bulkInboundDeleteResult { res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}} defer lockInbound(inboundId).Unlock() oldInbound, err := inboundSvc.GetInbound(inboundId) if err != nil { logger.Error("Load Old Data Error") for _, e := range emails { res.perEmailSkipped[e] = err.Error() } return res } var settings map[string]any if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil { for _, e := range emails { res.perEmailSkipped[e] = err.Error() } return res } // Match by email — the client's stable identity (see Delete). Removes every // entry carrying a wanted email, independent of credential drift. wantedEmails := make(map[string]struct{}, len(emails)) for _, email := range emails { if records[email] == nil { res.perEmailSkipped[email] = "client not found" continue } wantedEmails[email] = struct{}{} } interfaceClients, _ := settings["clients"].([]any) newClients := make([]any, 0, len(interfaceClients)) foundEmails := map[string]bool{} enableByEmail := map[string]bool{} for _, client := range interfaceClients { c, ok := client.(map[string]any) if !ok { newClients = append(newClients, client) continue } em, _ := c["email"].(string) if _, found := wantedEmails[em]; found && em != "" { foundEmails[em] = true en, _ := c["enable"].(bool) enableByEmail[em] = en continue } newClients = append(newClients, client) } for email := range wantedEmails { if !foundEmails[email] { res.perEmailSkipped[email] = "Client Not Found In Inbound" } } db := database.GetDB() newClients = compactOrphans(db, newClients) if newClients == nil { newClients = []any{} } settings["clients"] = newClients newSettings, err := json.MarshalIndent(settings, "", " ") if err != nil { for email := range foundEmails { if _, skip := res.perEmailSkipped[email]; !skip { res.perEmailSkipped[email] = err.Error() } } return res } oldInbound.Settings = string(newSettings) foundList := make([]string, 0, len(foundEmails)) for email := range foundEmails { foundList = append(foundList, email) } notDepletedByEmail := map[string]bool{} if len(foundList) > 0 { type trafficRow struct { Email string Enable bool } for _, batch := range chunkStrings(foundList, sqlInChunk) { var rows []trafficRow if err := db.Model(xray.ClientTraffic{}). Where("email IN ?", batch). Select("email, enable"). Scan(&rows).Error; err == nil { for _, r := range rows { notDepletedByEmail[r.Email] = r.Enable } } } } var sharedSet map[string]bool if !keepTraffic { var sharedErr error sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId) if sharedErr != nil { for email := range foundEmails { res.perEmailSkipped[email] = sharedErr.Error() delete(foundEmails, email) } return res } } if !keepTraffic { purge := make([]string, 0, len(foundEmails)) for email := range foundEmails { if !sharedSet[strings.ToLower(strings.TrimSpace(email))] { purge = append(purge, email) } } if len(purge) > 0 { if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil { logger.Error("Error in delete client IPs") for _, email := range purge { res.perEmailSkipped[email] = delErr.Error() delete(foundEmails, email) } } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil { logger.Error("Delete stats Data Error") for _, email := range purge { res.perEmailSkipped[email] = delErr.Error() delete(foundEmails, email) } } } } markDirty := false if oldInbound.NodeID == nil { rt, rterr := inboundSvc.runtimeFor(oldInbound) if rterr != nil { res.needRestart = true } else { for email := range foundEmails { if !enableByEmail[email] || !notDepletedByEmail[email] { continue } err1 := rt.RemoveUser(context.Background(), oldInbound, email) if err1 == nil { logger.Debug("Client deleted on", rt.Name(), ":", email) } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) { logger.Debug("User is already deleted. Nothing to do more...") } else { logger.Debug("Error in deleting client on", rt.Name(), ":", err1) res.needRestart = true } } } } else { rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) if perr != nil { for email := range foundEmails { res.perEmailSkipped[email] = perr.Error() delete(foundEmails, email) } } else { if dirty { markDirty = true } if push { for email := range foundEmails { if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { logger.Warning("Error in deleting client on", rt.Name(), ":", err1) markDirty = true } } } } } txErr := db.Transaction(func(tx *gorm.DB) error { if err := tx.Save(oldInbound).Error; err != nil { return err } finalClients, err := inboundSvc.GetClients(oldInbound) if err != nil { return err } return s.SyncInbound(tx, inboundId, finalClients) }) if txErr != nil { for email := range foundEmails { if _, skip := res.perEmailSkipped[email]; !skip { res.perEmailSkipped[email] = txErr.Error() } } } else if markDirty && oldInbound.NodeID != nil { if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { logger.Warning("mark node dirty failed:", dErr) } } return res } // BulkCreateResult mirrors BulkAdjustResult for the create flow. type BulkCreateResult struct { Created int `json:"created"` Skipped []BulkCreateReport `json:"skipped,omitempty"` } type BulkCreateReport struct { Email string `json:"email"` Reason string `json:"reason"` } func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) { result := BulkCreateResult{} if len(payloads) == 0 { return result, false, nil } skip := func(email, reason string) { if strings.TrimSpace(email) == "" { email = "(missing email)" } result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason}) } emailSubIDs, err := inboundSvc.getAllEmailSubIDs() if err != nil { emailSubIDs = nil } type prepared struct { client model.Client inboundIds []int } prep := make([]prepared, 0, len(payloads)) emails := make([]string, 0, len(payloads)) subIDs := make([]string, 0, len(payloads)) seenEmail := make(map[string]struct{}, len(payloads)) seenSubID := make(map[string]string, len(payloads)) for i := range payloads { client := payloads[i].Client email := strings.TrimSpace(client.Email) if email == "" { skip("", "client email is required") continue } if verr := validateClientEmail(email); verr != nil { skip(email, verr.Error()) continue } if verr := validateClientSubID(client.SubID); verr != nil { skip(email, verr.Error()) continue } if len(payloads[i].InboundIds) == 0 { skip(email, "at least one inbound is required") continue } client.Email = email if client.SubID == "" { client.SubID = uuid.NewString() } if !client.Enable { client.Enable = true } now := time.Now().UnixMilli() if client.CreatedAt == 0 { client.CreatedAt = now } client.UpdatedAt = now le := strings.ToLower(email) if _, dup := seenEmail[le]; dup { skip(email, "email already in use: "+email) continue } if owner, ok := seenSubID[client.SubID]; ok && owner != le { skip(email, "subId already in use: "+client.SubID) continue } seenEmail[le] = struct{}{} seenSubID[client.SubID] = le prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds}) emails = append(emails, email) subIDs = append(subIDs, client.SubID) } if len(prep) == 0 { return result, false, nil } db := database.GetDB() const lookupChunk = 400 existingEmailSub := make(map[string]string, len(emails)) for start := 0; start < len(emails); start += lookupChunk { end := min(start+lookupChunk, len(emails)) var rows []model.ClientRecord if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil { return result, false, e } for i := range rows { existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID } } existingSubOwner := make(map[string]string, len(subIDs)) for start := 0; start < len(subIDs); start += lookupChunk { end := min(start+lookupChunk, len(subIDs)) var rows []model.ClientRecord if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil { return result, false, e } for i := range rows { existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email) } } inboundCache := make(map[int]*model.Inbound) getIb := func(id int) (*model.Inbound, error) { if ib, ok := inboundCache[id]; ok { return ib, nil } ib, e := inboundSvc.GetInbound(id) if e != nil { return nil, e } inboundCache[id] = ib return ib, nil } byInbound := make(map[int][]model.Client) idxByInbound := make(map[int][]int) inboundOrder := make([]int, 0) failed := make([]bool, len(prep)) reason := make([]string, len(prep)) for idx := range prep { le := strings.ToLower(prep[idx].client.Email) if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID { failed[idx] = true reason[idx] = "email already in use: " + prep[idx].client.Email continue } if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le { failed[idx] = true reason[idx] = "subId already in use: " + prep[idx].client.SubID continue } ok := true for _, ibId := range prep[idx].inboundIds { ib, e := getIb(ibId) if e != nil { failed[idx] = true reason[idx] = e.Error() ok = false break } if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil { failed[idx] = true reason[idx] = e.Error() ok = false break } } if !ok { continue } for _, ibId := range prep[idx].inboundIds { ib, _ := getIb(ibId) if _, seen := byInbound[ibId]; !seen { inboundOrder = append(inboundOrder, ibId) } byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib)) idxByInbound[ibId] = append(idxByInbound[ibId], idx) } } needRestart := false for _, ibId := range inboundOrder { payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]}) if e == nil { var nr bool nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs) if e == nil && nr { needRestart = true } } if e != nil { for _, idx := range idxByInbound[ibId] { failed[idx] = true if reason[idx] == "" { reason[idx] = e.Error() } } } } for idx := range prep { if failed[idx] { skip(prep[idx].client.Email, reason[idx]) } else { result.Created++ } } return result, needRestart, nil } func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) { db := database.GetDB() now := time.Now().UnixMilli() depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))" var rows []xray.ClientTraffic if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil { return 0, false, err } if len(rows) == 0 { return 0, false, nil } seen := make(map[string]struct{}, len(rows)) emails := make([]string, 0, len(rows)) for _, r := range rows { if r.Email == "" { continue } if _, ok := seen[r.Email]; ok { continue } seen[r.Email] = struct{}{} emails = append(emails, r.Email) } if len(emails) == 0 { return 0, false, nil } res, needRestart, err := s.BulkDelete(inboundSvc, emails, false) if err != nil { return res.Deleted, needRestart, err } return res.Deleted, needRestart, nil }