|
@@ -124,19 +124,23 @@ func compactOrphans(db *gorm.DB, clients []any) []any {
|
|
|
if len(emails) == 0 {
|
|
if len(emails) == 0 {
|
|
|
return clients
|
|
return clients
|
|
|
}
|
|
}
|
|
|
- var existingEmails []string
|
|
|
|
|
- if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails).Pluck("email", &existingEmails).Error; err != nil {
|
|
|
|
|
- logger.Warning("compactOrphans pluck:", err)
|
|
|
|
|
- return clients
|
|
|
|
|
|
|
+ existing := make(map[string]struct{}, len(emails))
|
|
|
|
|
+ const orphanChunk = 400
|
|
|
|
|
+ for start := 0; start < len(emails); start += orphanChunk {
|
|
|
|
|
+ end := min(start+orphanChunk, len(emails))
|
|
|
|
|
+ var found []string
|
|
|
|
|
+ if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails[start:end]).Pluck("email", &found).Error; err != nil {
|
|
|
|
|
+ logger.Warning("compactOrphans pluck:", err)
|
|
|
|
|
+ return clients
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, e := range found {
|
|
|
|
|
+ existing[e] = struct{}{}
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- if len(existingEmails) == len(emails) {
|
|
|
|
|
|
|
+ if len(existing) == len(emails) {
|
|
|
return clients
|
|
return clients
|
|
|
}
|
|
}
|
|
|
- existing := make(map[string]struct{}, len(existingEmails))
|
|
|
|
|
- for _, e := range existingEmails {
|
|
|
|
|
- existing[e] = struct{}{}
|
|
|
|
|
- }
|
|
|
|
|
- out := make([]any, 0, len(existingEmails))
|
|
|
|
|
|
|
+ out := make([]any, 0, len(existing))
|
|
|
for _, c := range clients {
|
|
for _, c := range clients {
|
|
|
cm, ok := c.(map[string]any)
|
|
cm, ok := c.(map[string]any)
|
|
|
if !ok {
|
|
if !ok {
|
|
@@ -170,6 +174,26 @@ func tombstoneClientEmail(email string) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func tombstoneClientEmails(emails []string) {
|
|
|
|
|
+ if len(emails) == 0 {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ now := time.Now()
|
|
|
|
|
+ cutoff := now.Add(-deleteTombstoneTTL)
|
|
|
|
|
+ recentlyDeletedMu.Lock()
|
|
|
|
|
+ defer recentlyDeletedMu.Unlock()
|
|
|
|
|
+ for _, email := range emails {
|
|
|
|
|
+ if email != "" {
|
|
|
|
|
+ recentlyDeleted[email] = now
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ for e, ts := range recentlyDeleted {
|
|
|
|
|
+ if ts.Before(cutoff) {
|
|
|
|
|
+ delete(recentlyDeleted, e)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func isClientEmailTombstoned(email string) bool {
|
|
func isClientEmailTombstoned(email string) bool {
|
|
|
if email == "" {
|
|
if email == "" {
|
|
|
return false
|
|
return false
|
|
@@ -196,73 +220,134 @@ func (s *ClientService) SyncInbound(tx *gorm.DB, inboundId int, clients []model.
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ emails := make([]string, 0, len(clients))
|
|
|
|
|
+ seen := make(map[string]struct{}, len(clients))
|
|
|
for i := range clients {
|
|
for i := range clients {
|
|
|
- c := clients[i]
|
|
|
|
|
- email := strings.TrimSpace(c.Email)
|
|
|
|
|
|
|
+ email := strings.TrimSpace(clients[i].Email)
|
|
|
if email == "" {
|
|
if email == "" {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
+ if _, ok := seen[email]; ok {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ seen[email] = struct{}{}
|
|
|
|
|
+ emails = append(emails, email)
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- incoming := c.ToRecord()
|
|
|
|
|
- row := &model.ClientRecord{}
|
|
|
|
|
- err := tx.Where("email = ?", email).First(row).Error
|
|
|
|
|
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
|
|
|
|
+ existing := make(map[string]*model.ClientRecord, len(emails))
|
|
|
|
|
+ const selectChunk = 400
|
|
|
|
|
+ for start := 0; start < len(emails); start += selectChunk {
|
|
|
|
|
+ end := min(start+selectChunk, len(emails))
|
|
|
|
|
+ var rows []model.ClientRecord
|
|
|
|
|
+ if err := tx.Where("email IN ?", emails[start:end]).Find(&rows).Error; err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
|
|
- if err := tx.Create(incoming).Error; err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- row = incoming
|
|
|
|
|
- } else {
|
|
|
|
|
- if incoming.UUID != "" {
|
|
|
|
|
- row.UUID = incoming.UUID
|
|
|
|
|
- }
|
|
|
|
|
- if incoming.Password != "" {
|
|
|
|
|
- row.Password = incoming.Password
|
|
|
|
|
- }
|
|
|
|
|
- if incoming.Auth != "" {
|
|
|
|
|
- row.Auth = incoming.Auth
|
|
|
|
|
- }
|
|
|
|
|
- row.Flow = incoming.Flow
|
|
|
|
|
- if incoming.Security != "" {
|
|
|
|
|
- row.Security = incoming.Security
|
|
|
|
|
- }
|
|
|
|
|
- if incoming.Reverse != "" {
|
|
|
|
|
- row.Reverse = incoming.Reverse
|
|
|
|
|
- }
|
|
|
|
|
- row.SubID = incoming.SubID
|
|
|
|
|
- row.LimitIP = incoming.LimitIP
|
|
|
|
|
- row.TotalGB = incoming.TotalGB
|
|
|
|
|
- row.ExpiryTime = incoming.ExpiryTime
|
|
|
|
|
- row.Enable = incoming.Enable
|
|
|
|
|
- row.TgID = incoming.TgID
|
|
|
|
|
- if incoming.Group != "" {
|
|
|
|
|
- row.Group = incoming.Group
|
|
|
|
|
- }
|
|
|
|
|
- row.Comment = incoming.Comment
|
|
|
|
|
- row.Reset = incoming.Reset
|
|
|
|
|
- if incoming.CreatedAt > 0 && (row.CreatedAt == 0 || incoming.CreatedAt < row.CreatedAt) {
|
|
|
|
|
- row.CreatedAt = incoming.CreatedAt
|
|
|
|
|
- }
|
|
|
|
|
- preservedUpdatedAt := max(incoming.UpdatedAt, row.UpdatedAt)
|
|
|
|
|
- row.UpdatedAt = preservedUpdatedAt
|
|
|
|
|
- if err := tx.Save(row).Error; err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- if err := tx.Model(&model.ClientRecord{}).
|
|
|
|
|
- Where("id = ?", row.Id).
|
|
|
|
|
- UpdateColumn("updated_at", preservedUpdatedAt).Error; err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+ for i := range rows {
|
|
|
|
|
+ r := rows[i]
|
|
|
|
|
+ existing[r.Email] = &r
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ idByEmail := make(map[string]int, len(emails))
|
|
|
|
|
+ pending := make(map[string]*model.ClientRecord, len(emails))
|
|
|
|
|
+ toCreate := make([]*model.ClientRecord, 0, len(emails))
|
|
|
|
|
+ for i := range clients {
|
|
|
|
|
+ email := strings.TrimSpace(clients[i].Email)
|
|
|
|
|
+ if email == "" {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ incoming := clients[i].ToRecord()
|
|
|
|
|
+ row, ok := existing[email]
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ if _, dup := pending[email]; !dup {
|
|
|
|
|
+ pending[email] = incoming
|
|
|
|
|
+ toCreate = append(toCreate, incoming)
|
|
|
}
|
|
}
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- link := model.ClientInbound{
|
|
|
|
|
- ClientId: row.Id,
|
|
|
|
|
- InboundId: inboundId,
|
|
|
|
|
- FlowOverride: c.Flow,
|
|
|
|
|
|
|
+ before := *row
|
|
|
|
|
+ if incoming.UUID != "" {
|
|
|
|
|
+ row.UUID = incoming.UUID
|
|
|
|
|
+ }
|
|
|
|
|
+ if incoming.Password != "" {
|
|
|
|
|
+ row.Password = incoming.Password
|
|
|
|
|
+ }
|
|
|
|
|
+ if incoming.Auth != "" {
|
|
|
|
|
+ row.Auth = incoming.Auth
|
|
|
|
|
+ }
|
|
|
|
|
+ row.Flow = incoming.Flow
|
|
|
|
|
+ if incoming.Security != "" {
|
|
|
|
|
+ row.Security = incoming.Security
|
|
|
|
|
+ }
|
|
|
|
|
+ if incoming.Reverse != "" {
|
|
|
|
|
+ row.Reverse = incoming.Reverse
|
|
|
|
|
+ }
|
|
|
|
|
+ row.SubID = incoming.SubID
|
|
|
|
|
+ row.LimitIP = incoming.LimitIP
|
|
|
|
|
+ row.TotalGB = incoming.TotalGB
|
|
|
|
|
+ row.ExpiryTime = incoming.ExpiryTime
|
|
|
|
|
+ row.Enable = incoming.Enable
|
|
|
|
|
+ row.TgID = incoming.TgID
|
|
|
|
|
+ if incoming.Group != "" {
|
|
|
|
|
+ row.Group = incoming.Group
|
|
|
|
|
+ }
|
|
|
|
|
+ row.Comment = incoming.Comment
|
|
|
|
|
+ row.Reset = incoming.Reset
|
|
|
|
|
+ if incoming.CreatedAt > 0 && (row.CreatedAt == 0 || incoming.CreatedAt < row.CreatedAt) {
|
|
|
|
|
+ row.CreatedAt = incoming.CreatedAt
|
|
|
|
|
+ }
|
|
|
|
|
+ preservedUpdatedAt := max(incoming.UpdatedAt, row.UpdatedAt)
|
|
|
|
|
+ row.UpdatedAt = preservedUpdatedAt
|
|
|
|
|
+
|
|
|
|
|
+ idByEmail[email] = row.Id
|
|
|
|
|
+
|
|
|
|
|
+ if *row == before {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := tx.Save(row).Error; err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := tx.Model(&model.ClientRecord{}).
|
|
|
|
|
+ Where("id = ?", row.Id).
|
|
|
|
|
+ UpdateColumn("updated_at", preservedUpdatedAt).Error; err != nil {
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
- if err := tx.Create(&link).Error; err != nil {
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if len(toCreate) > 0 {
|
|
|
|
|
+ if err := tx.CreateInBatches(toCreate, 200).Error; err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, rec := range toCreate {
|
|
|
|
|
+ idByEmail[rec.Email] = rec.Id
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ links := make([]model.ClientInbound, 0, len(clients))
|
|
|
|
|
+ linked := make(map[int]struct{}, len(clients))
|
|
|
|
|
+ for i := range clients {
|
|
|
|
|
+ email := strings.TrimSpace(clients[i].Email)
|
|
|
|
|
+ if email == "" {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ id, ok := idByEmail[email]
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, dup := linked[id]; dup {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ linked[id] = struct{}{}
|
|
|
|
|
+ links = append(links, model.ClientInbound{
|
|
|
|
|
+ ClientId: id,
|
|
|
|
|
+ InboundId: inboundId,
|
|
|
|
|
+ FlowOverride: clients[i].Flow,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(links) > 0 {
|
|
|
|
|
+ if err := tx.CreateInBatches(links, 200).Error; err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -397,20 +482,26 @@ func (s *ClientService) List() ([]ClientWithAttachments, error) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- var links []model.ClientInbound
|
|
|
|
|
- if err := db.Where("client_id IN ?", clientIds).Find(&links).Error; err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
attachments := make(map[int][]int, len(rows))
|
|
attachments := make(map[int][]int, len(rows))
|
|
|
- for _, l := range links {
|
|
|
|
|
- attachments[l.ClientId] = append(attachments[l.ClientId], l.InboundId)
|
|
|
|
|
|
|
+ for _, batch := range chunkInts(clientIds, sqlInChunk) {
|
|
|
|
|
+ var links []model.ClientInbound
|
|
|
|
|
+ if err := db.Where("client_id IN ?", batch).Find(&links).Error; err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, l := range links {
|
|
|
|
|
+ attachments[l.ClientId] = append(attachments[l.ClientId], l.InboundId)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
trafficByEmail := make(map[string]*xray.ClientTraffic, len(emails))
|
|
trafficByEmail := make(map[string]*xray.ClientTraffic, len(emails))
|
|
|
if len(emails) > 0 {
|
|
if len(emails) > 0 {
|
|
|
var stats []xray.ClientTraffic
|
|
var stats []xray.ClientTraffic
|
|
|
- if err := db.Where("email IN ?", emails).Find(&stats).Error; err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(emails, sqlInChunk) {
|
|
|
|
|
+ var batchStats []xray.ClientTraffic
|
|
|
|
|
+ if err := db.Where("email IN ?", batch).Find(&batchStats).Error; err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ stats = append(stats, batchStats...)
|
|
|
}
|
|
}
|
|
|
for i := range stats {
|
|
for i := range stats {
|
|
|
trafficByEmail[stats[i].Email] = &stats[i]
|
|
trafficByEmail[stats[i].Email] = &stats[i]
|
|
@@ -634,7 +725,7 @@ func applyShadowsocksClientMethod(clients []any, settings map[string]any) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *ClientService) Update(inboundSvc *InboundService, id int, updated model.Client) (bool, error) {
|
|
|
|
|
|
|
+func (s *ClientService) Update(inboundSvc *InboundService, id int, updated model.Client, inboundFilter ...int) (bool, error) {
|
|
|
existing, err := s.GetByID(id)
|
|
existing, err := s.GetByID(id)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, err
|
|
return false, err
|
|
@@ -643,6 +734,19 @@ func (s *ClientService) Update(inboundSvc *InboundService, id int, updated model
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, err
|
|
return false, err
|
|
|
}
|
|
}
|
|
|
|
|
+ if len(inboundFilter) > 0 {
|
|
|
|
|
+ allow := make(map[int]struct{}, len(inboundFilter))
|
|
|
|
|
+ for _, fid := range inboundFilter {
|
|
|
|
|
+ allow[fid] = struct{}{}
|
|
|
|
|
+ }
|
|
|
|
|
+ filtered := inboundIds[:0:0]
|
|
|
|
|
+ for _, ibId := range inboundIds {
|
|
|
|
|
+ if _, ok := allow[ibId]; ok {
|
|
|
|
|
+ filtered = append(filtered, ibId)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ inboundIds = filtered
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
if strings.TrimSpace(updated.Email) == "" {
|
|
if strings.TrimSpace(updated.Email) == "" {
|
|
|
return false, common.NewError("client email is required")
|
|
return false, common.NewError("client email is required")
|
|
@@ -1170,13 +1274,25 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
|
|
|
}
|
|
}
|
|
|
oldInbound.Settings = string(newSettings)
|
|
oldInbound.Settings = string(newSettings)
|
|
|
|
|
|
|
|
|
|
+ var sharedSet map[string]bool
|
|
|
|
|
+ if !keepTraffic {
|
|
|
|
|
+ removedEmails := make([]string, 0, len(removed))
|
|
|
|
|
+ for _, r := range removed {
|
|
|
|
|
+ if r.email != "" {
|
|
|
|
|
+ removedEmails = append(removedEmails, r.email)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ var sharedErr error
|
|
|
|
|
+ sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(removedEmails, inboundId)
|
|
|
|
|
+ if sharedErr != nil {
|
|
|
|
|
+ return false, sharedErr
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
needRestart := false
|
|
needRestart := false
|
|
|
for _, r := range removed {
|
|
for _, r := range removed {
|
|
|
email := r.email
|
|
email := r.email
|
|
|
- emailShared, err := inboundSvc.emailUsedByOtherInbounds(email, inboundId)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return needRestart, err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))]
|
|
|
if !emailShared && !keepTraffic {
|
|
if !emailShared && !keepTraffic {
|
|
|
if err := inboundSvc.DelClientIPs(db, email); err != nil {
|
|
if err := inboundSvc.DelClientIPs(db, email); err != nil {
|
|
|
logger.Error("Error in delete client IPs")
|
|
logger.Error("Error in delete client IPs")
|
|
@@ -1317,7 +1433,7 @@ func (s *ClientService) findInboundIdsByClientEmail(email string) ([]int, error)
|
|
|
return out, nil
|
|
return out, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *ClientService) UpdateByEmail(inboundSvc *InboundService, email string, updated model.Client) (bool, error) {
|
|
|
|
|
|
|
+func (s *ClientService) UpdateByEmail(inboundSvc *InboundService, email string, updated model.Client, inboundFilter ...int) (bool, error) {
|
|
|
if email == "" {
|
|
if email == "" {
|
|
|
return false, common.NewError("client email is required")
|
|
return false, common.NewError("client email is required")
|
|
|
}
|
|
}
|
|
@@ -1325,7 +1441,7 @@ func (s *ClientService) UpdateByEmail(inboundSvc *InboundService, email string,
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, err
|
|
return false, err
|
|
|
}
|
|
}
|
|
|
- return s.Update(inboundSvc, rec.Id, updated)
|
|
|
|
|
|
|
+ return s.Update(inboundSvc, rec.Id, updated, inboundFilter...)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientService) ResetTrafficByEmail(inboundSvc *InboundService, email string) (bool, error) {
|
|
func (s *ClientService) ResetTrafficByEmail(inboundSvc *InboundService, email string) (bool, error) {
|
|
@@ -1631,14 +1747,43 @@ func (s *ClientService) BulkResetTraffic(inboundSvc *InboundService, emails []st
|
|
|
if len(emails) == 0 {
|
|
if len(emails) == 0 {
|
|
|
return 0, nil
|
|
return 0, nil
|
|
|
}
|
|
}
|
|
|
- count := 0
|
|
|
|
|
- for _, email := range emails {
|
|
|
|
|
- if _, err := s.ResetTrafficByEmail(inboundSvc, email); err != nil {
|
|
|
|
|
- return count, err
|
|
|
|
|
|
|
+ seen := map[string]struct{}{}
|
|
|
|
|
+ cleanEmails := make([]string, 0, len(emails))
|
|
|
|
|
+ for _, e := range emails {
|
|
|
|
|
+ e = strings.TrimSpace(e)
|
|
|
|
|
+ if e == "" {
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
- count++
|
|
|
|
|
|
|
+ if _, ok := seen[e]; ok {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ seen[e] = struct{}{}
|
|
|
|
|
+ cleanEmails = append(cleanEmails, e)
|
|
|
}
|
|
}
|
|
|
- return count, nil
|
|
|
|
|
|
|
+ if len(cleanEmails) == 0 {
|
|
|
|
|
+ return 0, nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ affected := 0
|
|
|
|
|
+ err := submitTrafficWrite(func() error {
|
|
|
|
|
+ db := database.GetDB()
|
|
|
|
|
+ return db.Transaction(func(tx *gorm.DB) error {
|
|
|
|
|
+ for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
|
|
|
|
|
+ res := tx.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("email IN ?", batch).
|
|
|
|
|
+ Updates(map[string]any{"enable": true, "up": 0, "down": 0})
|
|
|
|
|
+ if res.Error != nil {
|
|
|
|
|
+ return res.Error
|
|
|
|
|
+ }
|
|
|
|
|
+ affected += int(res.RowsAffected)
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil
|
|
|
|
|
+ })
|
|
|
|
|
+ })
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ return affected, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientService) CreateGroup(name string) error {
|
|
func (s *ClientService) CreateGroup(name string) error {
|
|
@@ -1710,8 +1855,12 @@ func (s *ClientService) AddToGroup(emails []string, group string) (int, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var records []model.ClientRecord
|
|
var records []model.ClientRecord
|
|
|
- if err := db.Where("email IN ?", emails).Find(&records).Error; err != nil {
|
|
|
|
|
- return 0, err
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(emails, sqlInChunk) {
|
|
|
|
|
+ var rows []model.ClientRecord
|
|
|
|
|
+ if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ records = append(records, rows...)
|
|
|
}
|
|
}
|
|
|
if len(records) == 0 {
|
|
if len(records) == 0 {
|
|
|
return 0, nil
|
|
return 0, nil
|
|
@@ -1722,21 +1871,33 @@ func (s *ClientService) AddToGroup(emails []string, group string) (int, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
tx := db.Begin()
|
|
tx := db.Begin()
|
|
|
- if err := tx.Model(&model.ClientRecord{}).
|
|
|
|
|
- Where("email IN ?", affectedEmails).
|
|
|
|
|
- UpdateColumn("group_name", group).Error; err != nil {
|
|
|
|
|
- tx.Rollback()
|
|
|
|
|
- return 0, err
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(affectedEmails, sqlInChunk) {
|
|
|
|
|
+ if err := tx.Model(&model.ClientRecord{}).
|
|
|
|
|
+ Where("email IN ?", batch).
|
|
|
|
|
+ UpdateColumn("group_name", group).Error; err != nil {
|
|
|
|
|
+ tx.Rollback()
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var inboundIDs []int
|
|
var inboundIDs []int
|
|
|
- if err := tx.Table("client_inbounds").
|
|
|
|
|
- Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
|
|
|
- Where("clients.email IN ?", affectedEmails).
|
|
|
|
|
- Distinct("client_inbounds.inbound_id").
|
|
|
|
|
- Pluck("inbound_id", &inboundIDs).Error; err != nil {
|
|
|
|
|
- tx.Rollback()
|
|
|
|
|
- return 0, err
|
|
|
|
|
|
|
+ inboundIDSeen := make(map[int]struct{})
|
|
|
|
|
+ for _, batch := range chunkStrings(affectedEmails, sqlInChunk) {
|
|
|
|
|
+ var ids []int
|
|
|
|
|
+ if err := tx.Table("client_inbounds").
|
|
|
|
|
+ Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
|
|
|
+ Where("clients.email IN ?", batch).
|
|
|
|
|
+ Distinct("client_inbounds.inbound_id").
|
|
|
|
|
+ Pluck("inbound_id", &ids).Error; err != nil {
|
|
|
|
|
+ tx.Rollback()
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, id := range ids {
|
|
|
|
|
+ if _, ok := inboundIDSeen[id]; !ok {
|
|
|
|
|
+ inboundIDSeen[id] = struct{}{}
|
|
|
|
|
+ inboundIDs = append(inboundIDs, id)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
emailSet := make(map[string]struct{}, len(affectedEmails))
|
|
emailSet := make(map[string]struct{}, len(affectedEmails))
|
|
@@ -1828,13 +1989,23 @@ func (s *ClientService) replaceGroupValue(oldName, newName string) (int, error)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var inboundIDs []int
|
|
var inboundIDs []int
|
|
|
- if err := tx.Table("client_inbounds").
|
|
|
|
|
- Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
|
|
|
- Where("clients.email IN ?", affectedEmails).
|
|
|
|
|
- Distinct("client_inbounds.inbound_id").
|
|
|
|
|
- Pluck("inbound_id", &inboundIDs).Error; err != nil {
|
|
|
|
|
- tx.Rollback()
|
|
|
|
|
- return 0, err
|
|
|
|
|
|
|
+ inboundIDSeen := make(map[int]struct{})
|
|
|
|
|
+ for _, batch := range chunkStrings(affectedEmails, sqlInChunk) {
|
|
|
|
|
+ var ids []int
|
|
|
|
|
+ if err := tx.Table("client_inbounds").
|
|
|
|
|
+ Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
|
|
|
+ Where("clients.email IN ?", batch).
|
|
|
|
|
+ Distinct("client_inbounds.inbound_id").
|
|
|
|
|
+ Pluck("inbound_id", &ids).Error; err != nil {
|
|
|
|
|
+ tx.Rollback()
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, id := range ids {
|
|
|
|
|
+ if _, ok := inboundIDSeen[id]; !ok {
|
|
|
|
|
+ inboundIDSeen[id] = struct{}{}
|
|
|
|
|
+ inboundIDs = append(inboundIDs, id)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for _, ibID := range inboundIDs {
|
|
for _, ibID := range inboundIDs {
|
|
@@ -2304,8 +2475,12 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
|
|
|
|
|
var records []model.ClientRecord
|
|
var records []model.ClientRecord
|
|
|
- if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil {
|
|
|
|
|
- return result, false, err
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
|
|
|
|
|
+ var rows []model.ClientRecord
|
|
|
|
|
+ if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
|
|
+ return result, false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ records = append(records, rows...)
|
|
|
}
|
|
}
|
|
|
recordsByEmail := make(map[string]*model.ClientRecord, len(records))
|
|
recordsByEmail := make(map[string]*model.ClientRecord, len(records))
|
|
|
for i := range records {
|
|
for i := range records {
|
|
@@ -2381,8 +2556,12 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var mappings []model.ClientInbound
|
|
var mappings []model.ClientInbound
|
|
|
- if err := db.Where("client_id IN ?", plannedIds).Find(&mappings).Error; err != nil {
|
|
|
|
|
- return result, false, err
|
|
|
|
|
|
|
+ for _, batch := range chunkInts(plannedIds, sqlInChunk) {
|
|
|
|
|
+ var rows []model.ClientInbound
|
|
|
|
|
+ if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
|
|
+ return result, false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ mappings = append(mappings, rows...)
|
|
|
}
|
|
}
|
|
|
emailsByInbound := map[int][]string{}
|
|
emailsByInbound := map[int][]string{}
|
|
|
for _, m := range mappings {
|
|
for _, m := range mappings {
|
|
@@ -2570,20 +2749,22 @@ func (s *ClientService) bulkAdjustInboundClients(
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
- if err := db.Save(oldInbound).Error; err != nil {
|
|
|
|
|
|
|
+ txErr := db.Transaction(func(tx *gorm.DB) error {
|
|
|
|
|
+ if err := tx.Save(oldInbound).Error; err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ finalClients, gcErr := inboundSvc.GetClients(oldInbound)
|
|
|
|
|
+ if gcErr != nil {
|
|
|
|
|
+ return gcErr
|
|
|
|
|
+ }
|
|
|
|
|
+ return s.SyncInbound(tx, inboundId, finalClients)
|
|
|
|
|
+ })
|
|
|
|
|
+ if txErr != nil {
|
|
|
for email := range foundEmails {
|
|
for email := range foundEmails {
|
|
|
if _, skip := res.perEmailSkipped[email]; !skip {
|
|
if _, skip := res.perEmailSkipped[email]; !skip {
|
|
|
- res.perEmailSkipped[email] = err.Error()
|
|
|
|
|
|
|
+ res.perEmailSkipped[email] = txErr.Error()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- return res
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- finalClients, gcErr := inboundSvc.GetClients(oldInbound)
|
|
|
|
|
- if gcErr == nil {
|
|
|
|
|
- if syncErr := s.SyncInbound(db, inboundId, finalClients); syncErr != nil {
|
|
|
|
|
- logger.Warning("bulkAdjust SyncInbound:", syncErr)
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return res
|
|
return res
|
|
@@ -2601,6 +2782,8 @@ type BulkDeleteReport struct {
|
|
|
Reason string `json:"reason"`
|
|
Reason string `json:"reason"`
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+const sqlInChunk = 400
|
|
|
|
|
+
|
|
|
// BulkDelete removes every client in the list in one optimized pass.
|
|
// BulkDelete removes every client in the list in one optimized pass.
|
|
|
// Instead of running the full single-delete pipeline N times (which would
|
|
// Instead of running the full single-delete pipeline N times (which would
|
|
|
// re-read, re-parse, and re-write each inbound's settings JSON for every
|
|
// re-read, re-parse, and re-write each inbound's settings JSON for every
|
|
@@ -2631,14 +2814,20 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
|
|
|
|
|
var records []model.ClientRecord
|
|
var records []model.ClientRecord
|
|
|
- if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil {
|
|
|
|
|
- return result, false, err
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
|
|
|
|
|
+ var rows []model.ClientRecord
|
|
|
|
|
+ if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
|
|
+ return result, false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ records = append(records, rows...)
|
|
|
}
|
|
}
|
|
|
recordsByEmail := make(map[string]*model.ClientRecord, len(records))
|
|
recordsByEmail := make(map[string]*model.ClientRecord, len(records))
|
|
|
|
|
+ tombstoneEmails := make([]string, 0, len(records))
|
|
|
for i := range records {
|
|
for i := range records {
|
|
|
recordsByEmail[records[i].Email] = &records[i]
|
|
recordsByEmail[records[i].Email] = &records[i]
|
|
|
- tombstoneClientEmail(records[i].Email)
|
|
|
|
|
|
|
+ tombstoneEmails = append(tombstoneEmails, records[i].Email)
|
|
|
}
|
|
}
|
|
|
|
|
+ tombstoneClientEmails(tombstoneEmails)
|
|
|
|
|
|
|
|
skippedReasons := map[string]string{}
|
|
skippedReasons := map[string]string{}
|
|
|
for _, email := range cleanEmails {
|
|
for _, email := range cleanEmails {
|
|
@@ -2657,8 +2846,12 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
|
|
|
emailsByInbound := map[int][]string{}
|
|
emailsByInbound := map[int][]string{}
|
|
|
if len(clientIds) > 0 {
|
|
if len(clientIds) > 0 {
|
|
|
var mappings []model.ClientInbound
|
|
var mappings []model.ClientInbound
|
|
|
- if err := db.Where("client_id IN ?", clientIds).Find(&mappings).Error; err != nil {
|
|
|
|
|
- return result, false, err
|
|
|
|
|
|
|
+ for _, batch := range chunkInts(clientIds, sqlInChunk) {
|
|
|
|
|
+ var rows []model.ClientInbound
|
|
|
|
|
+ if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
|
|
|
|
|
+ return result, false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ mappings = append(mappings, rows...)
|
|
|
}
|
|
}
|
|
|
for _, m := range mappings {
|
|
for _, m := range mappings {
|
|
|
email, ok := recordIdToEmail[m.ClientId]
|
|
email, ok := recordIdToEmail[m.ClientId]
|
|
@@ -2693,20 +2886,26 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if len(successIds) > 0 {
|
|
if len(successIds) > 0 {
|
|
|
- if err := db.Where("client_id IN ?", successIds).Delete(&model.ClientInbound{}).Error; err != nil {
|
|
|
|
|
- return result, needRestart, err
|
|
|
|
|
|
|
+ for _, batch := range chunkInts(successIds, sqlInChunk) {
|
|
|
|
|
+ if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil {
|
|
|
|
|
+ return result, needRestart, err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
if !keepTraffic && len(successEmails) > 0 {
|
|
if !keepTraffic && len(successEmails) > 0 {
|
|
|
- if err := db.Where("email IN ?", successEmails).Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
|
|
|
- return result, needRestart, err
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(successEmails, sqlInChunk) {
|
|
|
|
|
+ if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
|
|
|
+ return result, needRestart, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil {
|
|
|
|
|
+ return result, needRestart, err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- if err := db.Where("client_email IN ?", successEmails).Delete(&model.InboundClientIps{}).Error; err != nil {
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, batch := range chunkInts(successIds, sqlInChunk) {
|
|
|
|
|
+ if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil {
|
|
|
return result, needRestart, err
|
|
return result, needRestart, err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- if err := db.Where("id IN ?", successIds).Delete(&model.ClientRecord{}).Error; err != nil {
|
|
|
|
|
- return result, needRestart, err
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
result.Deleted = len(successEmails)
|
|
result.Deleted = len(successEmails)
|
|
@@ -2835,38 +3034,52 @@ func (s *ClientService) bulkDelInboundClients(
|
|
|
Email string
|
|
Email string
|
|
|
Enable bool
|
|
Enable bool
|
|
|
}
|
|
}
|
|
|
- var rows []trafficRow
|
|
|
|
|
- if err := db.Model(xray.ClientTraffic{}).
|
|
|
|
|
- Where("email IN ?", foundList).
|
|
|
|
|
- Select("email, enable").
|
|
|
|
|
- Scan(&rows).Error; err == nil {
|
|
|
|
|
- for _, r := range rows {
|
|
|
|
|
- notDepletedByEmail[r.Email] = r.Enable
|
|
|
|
|
|
|
+ for _, batch := range chunkStrings(foundList, sqlInChunk) {
|
|
|
|
|
+ var rows []trafficRow
|
|
|
|
|
+ if err := db.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("email IN ?", batch).
|
|
|
|
|
+ Select("email, enable").
|
|
|
|
|
+ Scan(&rows).Error; err == nil {
|
|
|
|
|
+ for _, r := range rows {
|
|
|
|
|
+ notDepletedByEmail[r.Email] = r.Enable
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for email := range foundEmails {
|
|
|
|
|
- shared, sharedErr := inboundSvc.emailUsedByOtherInbounds(email, inboundId)
|
|
|
|
|
|
|
+ var sharedSet map[string]bool
|
|
|
|
|
+ if !keepTraffic {
|
|
|
|
|
+ var sharedErr error
|
|
|
|
|
+ sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
|
|
|
if sharedErr != nil {
|
|
if sharedErr != nil {
|
|
|
- res.perEmailSkipped[email] = sharedErr.Error()
|
|
|
|
|
- delete(foundEmails, email)
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- if shared || keepTraffic {
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ for email := range foundEmails {
|
|
|
|
|
+ res.perEmailSkipped[email] = sharedErr.Error()
|
|
|
|
|
+ delete(foundEmails, email)
|
|
|
|
|
+ }
|
|
|
|
|
+ return res
|
|
|
}
|
|
}
|
|
|
- if delErr := inboundSvc.DelClientIPs(db, email); delErr != nil {
|
|
|
|
|
- logger.Error("Error in delete client IPs")
|
|
|
|
|
- res.perEmailSkipped[email] = delErr.Error()
|
|
|
|
|
- delete(foundEmails, email)
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ if !keepTraffic {
|
|
|
|
|
+ purge := make([]string, 0, len(foundEmails))
|
|
|
|
|
+ for email := range foundEmails {
|
|
|
|
|
+ if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
|
|
|
|
|
+ purge = append(purge, email)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- if delErr := inboundSvc.DelClientStat(db, email); delErr != nil {
|
|
|
|
|
- logger.Error("Delete stats Data Error")
|
|
|
|
|
- res.perEmailSkipped[email] = delErr.Error()
|
|
|
|
|
- delete(foundEmails, email)
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ if len(purge) > 0 {
|
|
|
|
|
+ if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil {
|
|
|
|
|
+ logger.Error("Error in delete client IPs")
|
|
|
|
|
+ for _, email := range purge {
|
|
|
|
|
+ res.perEmailSkipped[email] = delErr.Error()
|
|
|
|
|
+ delete(foundEmails, email)
|
|
|
|
|
+ }
|
|
|
|
|
+ } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil {
|
|
|
|
|
+ logger.Error("Delete stats Data Error")
|
|
|
|
|
+ for _, email := range purge {
|
|
|
|
|
+ res.perEmailSkipped[email] = delErr.Error()
|
|
|
|
|
+ delete(foundEmails, email)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -2907,21 +3120,22 @@ func (s *ClientService) bulkDelInboundClients(
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if err := db.Save(oldInbound).Error; err != nil {
|
|
|
|
|
|
|
+ txErr := db.Transaction(func(tx *gorm.DB) error {
|
|
|
|
|
+ if err := tx.Save(oldInbound).Error; err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ finalClients, err := inboundSvc.GetClients(oldInbound)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ return s.SyncInbound(tx, inboundId, finalClients)
|
|
|
|
|
+ })
|
|
|
|
|
+ if txErr != nil {
|
|
|
for email := range foundEmails {
|
|
for email := range foundEmails {
|
|
|
if _, skip := res.perEmailSkipped[email]; !skip {
|
|
if _, skip := res.perEmailSkipped[email]; !skip {
|
|
|
- res.perEmailSkipped[email] = err.Error()
|
|
|
|
|
|
|
+ res.perEmailSkipped[email] = txErr.Error()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- return res
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- finalClients, err := inboundSvc.GetClients(oldInbound)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return res
|
|
|
|
|
- }
|
|
|
|
|
- if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
|
|
|
|
|
- return res
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return res
|
|
return res
|
|
@@ -2938,27 +3152,200 @@ type BulkCreateReport struct {
|
|
|
Reason string `json:"reason"`
|
|
Reason string `json:"reason"`
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// BulkCreate iterates payloads sequentially. Each item is the same shape
|
|
|
|
|
-// the single-create endpoint accepts, so callers can submit a heterogeneous
|
|
|
|
|
-// list (different inboundIds, plans, etc.) in one round-trip.
|
|
|
|
|
func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
|
|
func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
|
|
|
result := BulkCreateResult{}
|
|
result := BulkCreateResult{}
|
|
|
- needRestart := false
|
|
|
|
|
|
|
+ if len(payloads) == 0 {
|
|
|
|
|
+ return result, false, nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ skip := func(email, reason string) {
|
|
|
|
|
+ if strings.TrimSpace(email) == "" {
|
|
|
|
|
+ email = "(missing email)"
|
|
|
|
|
+ }
|
|
|
|
|
+ result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ emailSubIDs = nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ type prepared struct {
|
|
|
|
|
+ client model.Client
|
|
|
|
|
+ inboundIds []int
|
|
|
|
|
+ }
|
|
|
|
|
+ prep := make([]prepared, 0, len(payloads))
|
|
|
|
|
+ emails := make([]string, 0, len(payloads))
|
|
|
|
|
+ subIDs := make([]string, 0, len(payloads))
|
|
|
|
|
+ seenEmail := make(map[string]struct{}, len(payloads))
|
|
|
|
|
+ seenSubID := make(map[string]string, len(payloads))
|
|
|
|
|
+
|
|
|
for i := range payloads {
|
|
for i := range payloads {
|
|
|
- p := payloads[i]
|
|
|
|
|
- email := strings.TrimSpace(p.Client.Email)
|
|
|
|
|
- nr, err := s.Create(inboundSvc, &p)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- if email == "" {
|
|
|
|
|
- email = "(missing email)"
|
|
|
|
|
|
|
+ client := payloads[i].Client
|
|
|
|
|
+ email := strings.TrimSpace(client.Email)
|
|
|
|
|
+ if email == "" {
|
|
|
|
|
+ skip("", "client email is required")
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if verr := validateClientEmail(email); verr != nil {
|
|
|
|
|
+ skip(email, verr.Error())
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if verr := validateClientSubID(client.SubID); verr != nil {
|
|
|
|
|
+ skip(email, verr.Error())
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(payloads[i].InboundIds) == 0 {
|
|
|
|
|
+ skip(email, "at least one inbound is required")
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ client.Email = email
|
|
|
|
|
+ if client.SubID == "" {
|
|
|
|
|
+ client.SubID = uuid.NewString()
|
|
|
|
|
+ }
|
|
|
|
|
+ if !client.Enable {
|
|
|
|
|
+ client.Enable = true
|
|
|
|
|
+ }
|
|
|
|
|
+ now := time.Now().UnixMilli()
|
|
|
|
|
+ if client.CreatedAt == 0 {
|
|
|
|
|
+ client.CreatedAt = now
|
|
|
|
|
+ }
|
|
|
|
|
+ client.UpdatedAt = now
|
|
|
|
|
+
|
|
|
|
|
+ le := strings.ToLower(email)
|
|
|
|
|
+ if _, dup := seenEmail[le]; dup {
|
|
|
|
|
+ skip(email, "email already in use: "+email)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if owner, ok := seenSubID[client.SubID]; ok && owner != le {
|
|
|
|
|
+ skip(email, "subId already in use: "+client.SubID)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ seenEmail[le] = struct{}{}
|
|
|
|
|
+ seenSubID[client.SubID] = le
|
|
|
|
|
+
|
|
|
|
|
+ prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
|
|
|
|
|
+ emails = append(emails, email)
|
|
|
|
|
+ subIDs = append(subIDs, client.SubID)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if len(prep) == 0 {
|
|
|
|
|
+ return result, false, nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ db := database.GetDB()
|
|
|
|
|
+ const lookupChunk = 400
|
|
|
|
|
+ existingEmailSub := make(map[string]string, len(emails))
|
|
|
|
|
+ for start := 0; start < len(emails); start += lookupChunk {
|
|
|
|
|
+ end := min(start+lookupChunk, len(emails))
|
|
|
|
|
+ var rows []model.ClientRecord
|
|
|
|
|
+ if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
|
|
|
|
|
+ return result, false, e
|
|
|
|
|
+ }
|
|
|
|
|
+ for i := range rows {
|
|
|
|
|
+ existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ existingSubOwner := make(map[string]string, len(subIDs))
|
|
|
|
|
+ for start := 0; start < len(subIDs); start += lookupChunk {
|
|
|
|
|
+ end := min(start+lookupChunk, len(subIDs))
|
|
|
|
|
+ var rows []model.ClientRecord
|
|
|
|
|
+ if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
|
|
|
|
|
+ return result, false, e
|
|
|
|
|
+ }
|
|
|
|
|
+ for i := range rows {
|
|
|
|
|
+ existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ inboundCache := make(map[int]*model.Inbound)
|
|
|
|
|
+ getIb := func(id int) (*model.Inbound, error) {
|
|
|
|
|
+ if ib, ok := inboundCache[id]; ok {
|
|
|
|
|
+ return ib, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ ib, e := inboundSvc.GetInbound(id)
|
|
|
|
|
+ if e != nil {
|
|
|
|
|
+ return nil, e
|
|
|
|
|
+ }
|
|
|
|
|
+ inboundCache[id] = ib
|
|
|
|
|
+ return ib, nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ byInbound := make(map[int][]model.Client)
|
|
|
|
|
+ idxByInbound := make(map[int][]int)
|
|
|
|
|
+ inboundOrder := make([]int, 0)
|
|
|
|
|
+ failed := make([]bool, len(prep))
|
|
|
|
|
+ reason := make([]string, len(prep))
|
|
|
|
|
+
|
|
|
|
|
+ for idx := range prep {
|
|
|
|
|
+ le := strings.ToLower(prep[idx].client.Email)
|
|
|
|
|
+ if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
|
|
|
|
|
+ failed[idx] = true
|
|
|
|
|
+ reason[idx] = "email already in use: " + prep[idx].client.Email
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
|
|
|
|
|
+ failed[idx] = true
|
|
|
|
|
+ reason[idx] = "subId already in use: " + prep[idx].client.SubID
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ok := true
|
|
|
|
|
+ for _, ibId := range prep[idx].inboundIds {
|
|
|
|
|
+ ib, e := getIb(ibId)
|
|
|
|
|
+ if e != nil {
|
|
|
|
|
+ failed[idx] = true
|
|
|
|
|
+ reason[idx] = e.Error()
|
|
|
|
|
+ ok = false
|
|
|
|
|
+ break
|
|
|
}
|
|
}
|
|
|
- result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: err.Error()})
|
|
|
|
|
|
|
+ if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
|
|
|
|
|
+ failed[idx] = true
|
|
|
|
|
+ reason[idx] = e.Error()
|
|
|
|
|
+ ok = false
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if !ok {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
- if nr {
|
|
|
|
|
- needRestart = true
|
|
|
|
|
|
|
+ for _, ibId := range prep[idx].inboundIds {
|
|
|
|
|
+ ib, _ := getIb(ibId)
|
|
|
|
|
+ if _, seen := byInbound[ibId]; !seen {
|
|
|
|
|
+ inboundOrder = append(inboundOrder, ibId)
|
|
|
|
|
+ }
|
|
|
|
|
+ byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
|
|
|
|
|
+ idxByInbound[ibId] = append(idxByInbound[ibId], idx)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ needRestart := false
|
|
|
|
|
+ for _, ibId := range inboundOrder {
|
|
|
|
|
+ payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
|
|
|
|
|
+ if e == nil {
|
|
|
|
|
+ var nr bool
|
|
|
|
|
+ nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
|
|
|
|
|
+ if e == nil && nr {
|
|
|
|
|
+ needRestart = true
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if e != nil {
|
|
|
|
|
+ for _, idx := range idxByInbound[ibId] {
|
|
|
|
|
+ failed[idx] = true
|
|
|
|
|
+ if reason[idx] == "" {
|
|
|
|
|
+ reason[idx] = e.Error()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for idx := range prep {
|
|
|
|
|
+ if failed[idx] {
|
|
|
|
|
+ skip(prep[idx].client.Email, reason[idx])
|
|
|
|
|
+ } else {
|
|
|
|
|
+ result.Created++
|
|
|
}
|
|
}
|
|
|
- result.Created++
|
|
|
|
|
}
|
|
}
|
|
|
return result, needRestart, nil
|
|
return result, needRestart, nil
|
|
|
}
|
|
}
|
|
@@ -2976,33 +3363,27 @@ func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, erro
|
|
|
return 0, false, nil
|
|
return 0, false, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- emails := make(map[string]struct{}, len(rows))
|
|
|
|
|
|
|
+ seen := make(map[string]struct{}, len(rows))
|
|
|
|
|
+ emails := make([]string, 0, len(rows))
|
|
|
for _, r := range rows {
|
|
for _, r := range rows {
|
|
|
- if r.Email != "" {
|
|
|
|
|
- emails[r.Email] = struct{}{}
|
|
|
|
|
|
|
+ if r.Email == "" {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, ok := seen[r.Email]; ok {
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
|
|
+ seen[r.Email] = struct{}{}
|
|
|
|
|
+ emails = append(emails, r.Email)
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(emails) == 0 {
|
|
|
|
|
+ return 0, false, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- needRestart := false
|
|
|
|
|
- deleted := 0
|
|
|
|
|
- for email := range emails {
|
|
|
|
|
- var rec model.ClientRecord
|
|
|
|
|
- if err := db.Where("email = ?", email).First(&rec).Error; err != nil {
|
|
|
|
|
- if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- return deleted, needRestart, err
|
|
|
|
|
- }
|
|
|
|
|
- nr, err := s.Delete(inboundSvc, rec.Id, false)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return deleted, needRestart, err
|
|
|
|
|
- }
|
|
|
|
|
- if nr {
|
|
|
|
|
- needRestart = true
|
|
|
|
|
- }
|
|
|
|
|
- deleted++
|
|
|
|
|
|
|
+ res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return res.Deleted, needRestart, err
|
|
|
}
|
|
}
|
|
|
- return deleted, needRestart, nil
|
|
|
|
|
|
|
+ return res.Deleted, needRestart, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *ClientService) ResetAllClientTraffics(inboundSvc *InboundService, id int) error {
|
|
func (s *ClientService) ResetAllClientTraffics(inboundSvc *InboundService, id int) error {
|