Просмотр исходного кода

perf(clients): chunk IN queries and de-quadratic bulk delete/group/list

Bulk client operations bound their entire working set in a single
WHERE x IN (...) clause, which exceeds PostgreSQL's 65535-parameter limit
(and SQLite's 32766) and gives the planner a pathological query, so they
failed outright on inbounds/selections larger than the limit. Every such
query is now chunked at 400 items:

- BulkDelete / delete-all-clients: six IN queries chunked, and the
  per-row delete tombstone (which swept the whole in-memory map on every
  call, O(N^2)) replaced with a single bulk sweep.
- BulkAdjust: record and inbound-mapping lookups chunked.
- AddToGroup / RemoveFromGroup (bulk add/remove to group): three IN
  queries chunked.
- replaceGroupValue (rename/delete group): inbound-mapping lookup chunked.
- List (all-clients listing): link and traffic lookups chunked.

Measured on PostgreSQL 16: delete-all-clients on a 100k-client inbound
now completes in ~7s (previously crashed at the parameter limit); bulk
add/remove to group ~6s and full client list ~1s at 100k.

sync_scale_postgres_test.go adds skip-gated benchmarks for delete-all,
group add/remove, and list.
MHSanaei 13 часов назад
Родитель
Сommit
d1e733b9e9
2 измененных файлов с 243 добавлено и 53 удалено
  1. 133 53
      web/service/client.go
  2. 110 0
      web/service/sync_scale_postgres_test.go

+ 133 - 53
web/service/client.go

@@ -174,6 +174,26 @@ func tombstoneClientEmail(email string) {
 	}
 }
 
+func tombstoneClientEmails(emails []string) {
+	if len(emails) == 0 {
+		return
+	}
+	now := time.Now()
+	cutoff := now.Add(-deleteTombstoneTTL)
+	recentlyDeletedMu.Lock()
+	defer recentlyDeletedMu.Unlock()
+	for _, email := range emails {
+		if email != "" {
+			recentlyDeleted[email] = now
+		}
+	}
+	for e, ts := range recentlyDeleted {
+		if ts.Before(cutoff) {
+			delete(recentlyDeleted, e)
+		}
+	}
+}
+
 func isClientEmailTombstoned(email string) bool {
 	if email == "" {
 		return false
@@ -462,20 +482,26 @@ func (s *ClientService) List() ([]ClientWithAttachments, error) {
 		}
 	}
 
-	var links []model.ClientInbound
-	if err := db.Where("client_id IN ?", clientIds).Find(&links).Error; err != nil {
-		return nil, err
-	}
 	attachments := make(map[int][]int, len(rows))
-	for _, l := range links {
-		attachments[l.ClientId] = append(attachments[l.ClientId], l.InboundId)
+	for _, batch := range chunkInts(clientIds, sqlInChunk) {
+		var links []model.ClientInbound
+		if err := db.Where("client_id IN ?", batch).Find(&links).Error; err != nil {
+			return nil, err
+		}
+		for _, l := range links {
+			attachments[l.ClientId] = append(attachments[l.ClientId], l.InboundId)
+		}
 	}
 
 	trafficByEmail := make(map[string]*xray.ClientTraffic, len(emails))
 	if len(emails) > 0 {
 		var stats []xray.ClientTraffic
-		if err := db.Where("email IN ?", emails).Find(&stats).Error; err != nil {
-			return nil, err
+		for _, batch := range chunkStrings(emails, sqlInChunk) {
+			var batchStats []xray.ClientTraffic
+			if err := db.Where("email IN ?", batch).Find(&batchStats).Error; err != nil {
+				return nil, err
+			}
+			stats = append(stats, batchStats...)
 		}
 		for i := range stats {
 			trafficByEmail[stats[i].Email] = &stats[i]
@@ -1800,8 +1826,12 @@ func (s *ClientService) AddToGroup(emails []string, group string) (int, error) {
 	}
 
 	var records []model.ClientRecord
-	if err := db.Where("email IN ?", emails).Find(&records).Error; err != nil {
-		return 0, err
+	for _, batch := range chunkStrings(emails, sqlInChunk) {
+		var rows []model.ClientRecord
+		if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
+			return 0, err
+		}
+		records = append(records, rows...)
 	}
 	if len(records) == 0 {
 		return 0, nil
@@ -1812,21 +1842,33 @@ func (s *ClientService) AddToGroup(emails []string, group string) (int, error) {
 	}
 
 	tx := db.Begin()
-	if err := tx.Model(&model.ClientRecord{}).
-		Where("email IN ?", affectedEmails).
-		UpdateColumn("group_name", group).Error; err != nil {
-		tx.Rollback()
-		return 0, err
+	for _, batch := range chunkStrings(affectedEmails, sqlInChunk) {
+		if err := tx.Model(&model.ClientRecord{}).
+			Where("email IN ?", batch).
+			UpdateColumn("group_name", group).Error; err != nil {
+			tx.Rollback()
+			return 0, err
+		}
 	}
 
 	var inboundIDs []int
-	if err := tx.Table("client_inbounds").
-		Joins("JOIN clients ON clients.id = client_inbounds.client_id").
-		Where("clients.email IN ?", affectedEmails).
-		Distinct("client_inbounds.inbound_id").
-		Pluck("inbound_id", &inboundIDs).Error; err != nil {
-		tx.Rollback()
-		return 0, err
+	inboundIDSeen := make(map[int]struct{})
+	for _, batch := range chunkStrings(affectedEmails, sqlInChunk) {
+		var ids []int
+		if err := tx.Table("client_inbounds").
+			Joins("JOIN clients ON clients.id = client_inbounds.client_id").
+			Where("clients.email IN ?", batch).
+			Distinct("client_inbounds.inbound_id").
+			Pluck("inbound_id", &ids).Error; err != nil {
+			tx.Rollback()
+			return 0, err
+		}
+		for _, id := range ids {
+			if _, ok := inboundIDSeen[id]; !ok {
+				inboundIDSeen[id] = struct{}{}
+				inboundIDs = append(inboundIDs, id)
+			}
+		}
 	}
 
 	emailSet := make(map[string]struct{}, len(affectedEmails))
@@ -1918,13 +1960,23 @@ func (s *ClientService) replaceGroupValue(oldName, newName string) (int, error)
 	}
 
 	var inboundIDs []int
-	if err := tx.Table("client_inbounds").
-		Joins("JOIN clients ON clients.id = client_inbounds.client_id").
-		Where("clients.email IN ?", affectedEmails).
-		Distinct("client_inbounds.inbound_id").
-		Pluck("inbound_id", &inboundIDs).Error; err != nil {
-		tx.Rollback()
-		return 0, err
+	inboundIDSeen := make(map[int]struct{})
+	for _, batch := range chunkStrings(affectedEmails, sqlInChunk) {
+		var ids []int
+		if err := tx.Table("client_inbounds").
+			Joins("JOIN clients ON clients.id = client_inbounds.client_id").
+			Where("clients.email IN ?", batch).
+			Distinct("client_inbounds.inbound_id").
+			Pluck("inbound_id", &ids).Error; err != nil {
+			tx.Rollback()
+			return 0, err
+		}
+		for _, id := range ids {
+			if _, ok := inboundIDSeen[id]; !ok {
+				inboundIDSeen[id] = struct{}{}
+				inboundIDs = append(inboundIDs, id)
+			}
+		}
 	}
 
 	for _, ibID := range inboundIDs {
@@ -2394,8 +2446,12 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
 	db := database.GetDB()
 
 	var records []model.ClientRecord
-	if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil {
-		return result, false, err
+	for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
+		var rows []model.ClientRecord
+		if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
+			return result, false, err
+		}
+		records = append(records, rows...)
 	}
 	recordsByEmail := make(map[string]*model.ClientRecord, len(records))
 	for i := range records {
@@ -2471,8 +2527,12 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
 	}
 
 	var mappings []model.ClientInbound
-	if err := db.Where("client_id IN ?", plannedIds).Find(&mappings).Error; err != nil {
-		return result, false, err
+	for _, batch := range chunkInts(plannedIds, sqlInChunk) {
+		var rows []model.ClientInbound
+		if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
+			return result, false, err
+		}
+		mappings = append(mappings, rows...)
 	}
 	emailsByInbound := map[int][]string{}
 	for _, m := range mappings {
@@ -2693,6 +2753,8 @@ type BulkDeleteReport struct {
 	Reason string `json:"reason"`
 }
 
+const sqlInChunk = 400
+
 // BulkDelete removes every client in the list in one optimized pass.
 // Instead of running the full single-delete pipeline N times (which would
 // re-read, re-parse, and re-write each inbound's settings JSON for every
@@ -2723,14 +2785,20 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
 	db := database.GetDB()
 
 	var records []model.ClientRecord
-	if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil {
-		return result, false, err
+	for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
+		var rows []model.ClientRecord
+		if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
+			return result, false, err
+		}
+		records = append(records, rows...)
 	}
 	recordsByEmail := make(map[string]*model.ClientRecord, len(records))
+	tombstoneEmails := make([]string, 0, len(records))
 	for i := range records {
 		recordsByEmail[records[i].Email] = &records[i]
-		tombstoneClientEmail(records[i].Email)
+		tombstoneEmails = append(tombstoneEmails, records[i].Email)
 	}
+	tombstoneClientEmails(tombstoneEmails)
 
 	skippedReasons := map[string]string{}
 	for _, email := range cleanEmails {
@@ -2749,8 +2817,12 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
 	emailsByInbound := map[int][]string{}
 	if len(clientIds) > 0 {
 		var mappings []model.ClientInbound
-		if err := db.Where("client_id IN ?", clientIds).Find(&mappings).Error; err != nil {
-			return result, false, err
+		for _, batch := range chunkInts(clientIds, sqlInChunk) {
+			var rows []model.ClientInbound
+			if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
+				return result, false, err
+			}
+			mappings = append(mappings, rows...)
 		}
 		for _, m := range mappings {
 			email, ok := recordIdToEmail[m.ClientId]
@@ -2785,20 +2857,26 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
 	}
 
 	if len(successIds) > 0 {
-		if err := db.Where("client_id IN ?", successIds).Delete(&model.ClientInbound{}).Error; err != nil {
-			return result, needRestart, err
+		for _, batch := range chunkInts(successIds, sqlInChunk) {
+			if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil {
+				return result, needRestart, err
+			}
 		}
 		if !keepTraffic && len(successEmails) > 0 {
-			if err := db.Where("email IN ?", successEmails).Delete(&xray.ClientTraffic{}).Error; err != nil {
-				return result, needRestart, err
+			for _, batch := range chunkStrings(successEmails, sqlInChunk) {
+				if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil {
+					return result, needRestart, err
+				}
+				if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil {
+					return result, needRestart, err
+				}
 			}
-			if err := db.Where("client_email IN ?", successEmails).Delete(&model.InboundClientIps{}).Error; err != nil {
+		}
+		for _, batch := range chunkInts(successIds, sqlInChunk) {
+			if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil {
 				return result, needRestart, err
 			}
 		}
-		if err := db.Where("id IN ?", successIds).Delete(&model.ClientRecord{}).Error; err != nil {
-			return result, needRestart, err
-		}
 	}
 
 	result.Deleted = len(successEmails)
@@ -2927,13 +3005,15 @@ func (s *ClientService) bulkDelInboundClients(
 			Email  string
 			Enable bool
 		}
-		var rows []trafficRow
-		if err := db.Model(xray.ClientTraffic{}).
-			Where("email IN ?", foundList).
-			Select("email, enable").
-			Scan(&rows).Error; err == nil {
-			for _, r := range rows {
-				notDepletedByEmail[r.Email] = r.Enable
+		for _, batch := range chunkStrings(foundList, sqlInChunk) {
+			var rows []trafficRow
+			if err := db.Model(xray.ClientTraffic{}).
+				Where("email IN ?", batch).
+				Select("email, enable").
+				Scan(&rows).Error; err == nil {
+				for _, r := range rows {
+					notDepletedByEmail[r.Email] = r.Enable
+				}
 			}
 		}
 	}

+ 110 - 0
web/service/sync_scale_postgres_test.go

@@ -232,6 +232,116 @@ func TestAddDelClientPostgresScale(t *testing.T) {
 	}
 }
 
+func TestGroupAndListPostgresScale(t *testing.T) {
+	if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
+		t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
+	}
+	if err := database.InitDB(""); err != nil {
+		t.Fatalf("InitDB: %v", err)
+	}
+	t.Cleanup(func() { _ = database.CloseDB() })
+
+	svc := &ClientService{}
+	sizes := []int{5000, 100000}
+
+	for _, n := range sizes {
+		t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
+			db := database.GetDB()
+			if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
+				t.Fatalf("truncate: %v", err)
+			}
+			clients := makeScaleClients(n)
+			ib := &model.Inbound{Tag: fmt.Sprintf("grp-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
+			if err := db.Create(ib).Error; err != nil {
+				t.Fatalf("create inbound: %v", err)
+			}
+			if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
+				t.Fatalf("seed SyncInbound: %v", err)
+			}
+			db.Exec("ANALYZE")
+			emails := make([]string, n)
+			for i := 0; i < n; i++ {
+				emails[i] = clients[i].Email
+			}
+
+			start := time.Now()
+			if _, err := svc.AddToGroup(emails, "benchgroup"); err != nil {
+				t.Fatalf("AddToGroup: %v", err)
+			}
+			addDur := time.Since(start)
+
+			start = time.Now()
+			if _, err := svc.RemoveFromGroup(emails); err != nil {
+				t.Fatalf("RemoveFromGroup: %v", err)
+			}
+			rmDur := time.Since(start)
+
+			start = time.Now()
+			list, err := svc.List()
+			if err != nil {
+				t.Fatalf("List: %v", err)
+			}
+			listDur := time.Since(start)
+			if len(list) != n {
+				t.Fatalf("List returned %d, want %d", len(list), n)
+			}
+
+			t.Logf("N=%-7d bulkAdd=%-9v bulkRemove=%-9v list=%-9v", n,
+				addDur.Round(time.Millisecond), rmDur.Round(time.Millisecond), listDur.Round(time.Millisecond))
+		})
+	}
+}
+
+func TestDelAllClientsPostgresScale(t *testing.T) {
+	if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
+		t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
+	}
+	if err := database.InitDB(""); err != nil {
+		t.Fatalf("InitDB: %v", err)
+	}
+	t.Cleanup(func() { _ = database.CloseDB() })
+
+	svc := &ClientService{}
+	inboundSvc := &InboundService{}
+	sizes := []int{5000, 50000, 100000}
+
+	for _, n := range sizes {
+		t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
+			db := database.GetDB()
+			if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
+				t.Fatalf("truncate: %v", err)
+			}
+			clients := makeScaleClients(n)
+			ib := &model.Inbound{Tag: fmt.Sprintf("delall-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
+			if err := db.Create(ib).Error; err != nil {
+				t.Fatalf("create inbound: %v", err)
+			}
+			if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
+				t.Fatalf("seed SyncInbound: %v", err)
+			}
+
+			emails, err := inboundSvc.EmailsByInbound(ib.Id)
+			if err != nil {
+				t.Fatalf("EmailsByInbound: %v", err)
+			}
+			start := time.Now()
+			res, _, err := svc.BulkDelete(inboundSvc, emails, false)
+			if err != nil {
+				t.Fatalf("BulkDelete: %v", err)
+			}
+			dur := time.Since(start)
+
+			var recCount, linkCount int64
+			db.Model(&model.ClientRecord{}).Count(&recCount)
+			db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount)
+			if recCount != 0 || linkCount != 0 {
+				t.Fatalf("after delAll: records=%d links=%d want 0/0", recCount, linkCount)
+			}
+			t.Logf("N=%-7d delAllClients=%-10v deleted=%d", n, dur.Round(time.Millisecond), res.Deleted)
+		})
+	}
+}
+
 func TestBulkOpsPostgresScale(t *testing.T) {
 	if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
 		t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")