|
|
@@ -1291,3 +1291,300 @@ func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, erro
|
|
|
}
|
|
|
return res.Deleted, needRestart, nil
|
|
|
}
|
|
|
+
|
|
|
+type BulkSetEnableResult struct {
|
|
|
+ Changed int `json:"changed"`
|
|
|
+ Skipped []BulkSetEnableReport `json:"skipped,omitempty"`
|
|
|
+}
|
|
|
+
|
|
|
+type BulkSetEnableReport struct {
|
|
|
+ Email string `json:"email"`
|
|
|
+ Reason string `json:"reason"`
|
|
|
+}
|
|
|
+
|
|
|
+func (s *ClientService) BulkSetEnable(inboundSvc *InboundService, emails []string, enable bool) (BulkSetEnableResult, bool, error) {
|
|
|
+ result := BulkSetEnableResult{}
|
|
|
+
|
|
|
+ seen := map[string]struct{}{}
|
|
|
+ cleanEmails := make([]string, 0, len(emails))
|
|
|
+ for _, e := range emails {
|
|
|
+ e = strings.TrimSpace(e)
|
|
|
+ if e == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if _, ok := seen[e]; ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ seen[e] = struct{}{}
|
|
|
+ cleanEmails = append(cleanEmails, e)
|
|
|
+ }
|
|
|
+ if len(cleanEmails) == 0 {
|
|
|
+ return result, false, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ db := database.GetDB()
|
|
|
+
|
|
|
+ var records []model.ClientRecord
|
|
|
+ for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
|
|
|
+ var rows []model.ClientRecord
|
|
|
+ if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
+ return result, false, err
|
|
|
+ }
|
|
|
+ records = append(records, rows...)
|
|
|
+ }
|
|
|
+ recordsByEmail := make(map[string]*model.ClientRecord, len(records))
|
|
|
+ for i := range records {
|
|
|
+ recordsByEmail[records[i].Email] = &records[i]
|
|
|
+ }
|
|
|
+
|
|
|
+ skippedReasons := map[string]string{}
|
|
|
+ for _, email := range cleanEmails {
|
|
|
+ if _, ok := recordsByEmail[email]; !ok {
|
|
|
+ skippedReasons[email] = "client not found"
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ clientIds := make([]int, 0, len(recordsByEmail))
|
|
|
+ recordIdToEmail := make(map[int]string, len(recordsByEmail))
|
|
|
+ for _, r := range recordsByEmail {
|
|
|
+ clientIds = append(clientIds, r.Id)
|
|
|
+ recordIdToEmail[r.Id] = r.Email
|
|
|
+ }
|
|
|
+
|
|
|
+ emailsByInbound := map[int][]string{}
|
|
|
+ if len(clientIds) > 0 {
|
|
|
+ var mappings []model.ClientInbound
|
|
|
+ for _, batch := range chunkInts(clientIds, sqlInChunk) {
|
|
|
+ var rows []model.ClientInbound
|
|
|
+ if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
+ return result, false, err
|
|
|
+ }
|
|
|
+ mappings = append(mappings, rows...)
|
|
|
+ }
|
|
|
+ for _, m := range mappings {
|
|
|
+ email, ok := recordIdToEmail[m.ClientId]
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ needRestart := false
|
|
|
+ for inboundId, ibEmails := range emailsByInbound {
|
|
|
+ ibRes := s.bulkSetEnableInboundClients(inboundSvc, inboundId, ibEmails, enable)
|
|
|
+ if ibRes.needRestart {
|
|
|
+ needRestart = true
|
|
|
+ }
|
|
|
+ for email, reason := range ibRes.perEmailSkipped {
|
|
|
+ if _, already := skippedReasons[email]; !already {
|
|
|
+ skippedReasons[email] = reason
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ successEmails := make([]string, 0, len(recordsByEmail))
|
|
|
+ for email := range recordsByEmail {
|
|
|
+ if _, skipped := skippedReasons[email]; skipped {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ successEmails = append(successEmails, email)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(successEmails) > 0 {
|
|
|
+ now := time.Now().UnixMilli()
|
|
|
+ if err := runSerializedTx(func(tx *gorm.DB) error {
|
|
|
+ for _, batch := range chunkStrings(successEmails, sqlInChunk) {
|
|
|
+ if e := tx.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("enable", enable).Error; e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ if e := tx.Model(&model.ClientRecord{}).Where("email IN ?", batch).
|
|
|
+ Updates(map[string]any{"enable": enable, "updated_at": now}).Error; e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }); err != nil {
|
|
|
+ return result, needRestart, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Changed = len(successEmails)
|
|
|
+ for email, reason := range skippedReasons {
|
|
|
+ result.Skipped = append(result.Skipped, BulkSetEnableReport{Email: email, Reason: reason})
|
|
|
+ }
|
|
|
+ return result, needRestart, nil
|
|
|
+}
|
|
|
+
|
|
|
+type bulkSetEnableInboundResult struct {
|
|
|
+ perEmailSkipped map[string]string
|
|
|
+ needRestart bool
|
|
|
+}
|
|
|
+
|
|
|
+func (s *ClientService) bulkSetEnableInboundClients(inboundSvc *InboundService, inboundId int, emails []string, enable bool) bulkSetEnableInboundResult {
|
|
|
+ res := bulkSetEnableInboundResult{perEmailSkipped: map[string]string{}}
|
|
|
+
|
|
|
+ defer lockInbound(inboundId).Unlock()
|
|
|
+
|
|
|
+ oldInbound, err := inboundSvc.GetInbound(inboundId)
|
|
|
+ if err != nil {
|
|
|
+ for _, e := range emails {
|
|
|
+ res.perEmailSkipped[e] = err.Error()
|
|
|
+ }
|
|
|
+ return res
|
|
|
+ }
|
|
|
+
|
|
|
+ var settings map[string]any
|
|
|
+ if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
|
|
|
+ for _, e := range emails {
|
|
|
+ res.perEmailSkipped[e] = err.Error()
|
|
|
+ }
|
|
|
+ return res
|
|
|
+ }
|
|
|
+
|
|
|
+ wanted := make(map[string]struct{}, len(emails))
|
|
|
+ for _, email := range emails {
|
|
|
+ wanted[email] = struct{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ cipher := ""
|
|
|
+ if oldInbound.Protocol == model.Shadowsocks {
|
|
|
+ cipher, _ = settings["method"].(string)
|
|
|
+ }
|
|
|
+
|
|
|
+ type changedClient struct {
|
|
|
+ email string
|
|
|
+ wasEnable bool
|
|
|
+ client model.Client
|
|
|
+ }
|
|
|
+ var changed []changedClient
|
|
|
+ found := map[string]bool{}
|
|
|
+ nowMs := time.Now().UnixMilli()
|
|
|
+
|
|
|
+ interfaceClients, _ := settings["clients"].([]any)
|
|
|
+ for i, c := range interfaceClients {
|
|
|
+ entry, ok := c.(map[string]any)
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ email, _ := entry["email"].(string)
|
|
|
+ if _, want := wanted[email]; !want || email == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ found[email] = true
|
|
|
+ prev, _ := entry["enable"].(bool)
|
|
|
+ if prev == enable {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ entry["enable"] = enable
|
|
|
+ entry["updated_at"] = nowMs
|
|
|
+ interfaceClients[i] = entry
|
|
|
+ // Build the pushed client from the inbound JSON (the per-inbound source of
|
|
|
+ // truth), so a remote UpdateUser carries every field and never zeroes
|
|
|
+ // subId/totalGB/expiry from drifting ClientRecord columns (#4628/#4792).
|
|
|
+ var parsed model.Client
|
|
|
+ if b, mErr := json.Marshal(entry); mErr == nil {
|
|
|
+ _ = json.Unmarshal(b, &parsed)
|
|
|
+ }
|
|
|
+ parsed.Email = email
|
|
|
+ parsed.Enable = enable
|
|
|
+ changed = append(changed, changedClient{email: email, wasEnable: prev, client: parsed})
|
|
|
+ }
|
|
|
+
|
|
|
+ for email := range wanted {
|
|
|
+ if !found[email] {
|
|
|
+ res.perEmailSkipped[email] = "Client Not Found In Inbound"
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(changed) == 0 {
|
|
|
+ return res
|
|
|
+ }
|
|
|
+
|
|
|
+ settings["clients"] = interfaceClients
|
|
|
+ newSettings, err := json.MarshalIndent(settings, "", " ")
|
|
|
+ if err != nil {
|
|
|
+ for _, ch := range changed {
|
|
|
+ res.perEmailSkipped[ch.email] = err.Error()
|
|
|
+ }
|
|
|
+ return res
|
|
|
+ }
|
|
|
+ oldInbound.Settings = string(newSettings)
|
|
|
+
|
|
|
+ rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
|
|
|
+ if perr != nil {
|
|
|
+ for _, ch := range changed {
|
|
|
+ res.perEmailSkipped[ch.email] = perr.Error()
|
|
|
+ }
|
|
|
+ return res
|
|
|
+ }
|
|
|
+ markDirty := dirty
|
|
|
+ if oldInbound.NodeID != nil && push && len(changed) > nodeBulkPushThreshold {
|
|
|
+ markDirty = true
|
|
|
+ push = false
|
|
|
+ }
|
|
|
+
|
|
|
+ txErr := runSerializedTx(func(tx *gorm.DB) error {
|
|
|
+ if e := tx.Save(oldInbound).Error; e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ finalClients, gcErr := inboundSvc.GetClients(oldInbound)
|
|
|
+ if gcErr != nil {
|
|
|
+ return gcErr
|
|
|
+ }
|
|
|
+ return s.SyncInbound(tx, inboundId, finalClients)
|
|
|
+ })
|
|
|
+ if txErr != nil {
|
|
|
+ for _, ch := range changed {
|
|
|
+ res.perEmailSkipped[ch.email] = txErr.Error()
|
|
|
+ }
|
|
|
+ return res
|
|
|
+ }
|
|
|
+
|
|
|
+ if oldInbound.NodeID == nil {
|
|
|
+ if !push {
|
|
|
+ res.needRestart = true
|
|
|
+ } else {
|
|
|
+ for _, ch := range changed {
|
|
|
+ if enable {
|
|
|
+ err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
|
|
|
+ "email": ch.client.Email,
|
|
|
+ "id": ch.client.ID,
|
|
|
+ "security": ch.client.Security,
|
|
|
+ "flow": ch.client.Flow,
|
|
|
+ "auth": ch.client.Auth,
|
|
|
+ "password": ch.client.Password,
|
|
|
+ "cipher": cipher,
|
|
|
+ })
|
|
|
+ if err1 != nil {
|
|
|
+ logger.Debug("Error in adding client on", rt.Name(), ":", err1)
|
|
|
+ res.needRestart = true
|
|
|
+ }
|
|
|
+ } else if ch.wasEnable {
|
|
|
+ err1 := rt.RemoveUser(context.Background(), oldInbound, ch.email)
|
|
|
+ if err1 != nil && !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", ch.email)) {
|
|
|
+ logger.Debug("Error in removing client on", rt.Name(), ":", err1)
|
|
|
+ res.needRestart = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if push {
|
|
|
+ for _, ch := range changed {
|
|
|
+ updated := ch.client
|
|
|
+ updated.UpdatedAt = nowMs
|
|
|
+ if err1 := rt.UpdateUser(context.Background(), oldInbound, ch.email, updated); err1 != nil {
|
|
|
+ logger.Warning("Error in updating client on", rt.Name(), ":", err1)
|
|
|
+ markDirty = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if markDirty && oldInbound.NodeID != nil {
|
|
|
+ if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
|
|
|
+ logger.Warning("mark node dirty failed:", dErr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return res
|
|
|
+}
|