1
0
Эх сурвалжийг харах

fix(sub): resolve subscription clients and stats from normalized tables

A subscription fetch inside a large inbound cost seconds because every
layer re-parsed the inbound's full settings JSON: getInboundsBySubId
preloaded the whole client_traffics table of each matched inbound,
matchingClients parsed all clients to filter by subId, and then every
per-protocol generator (raw links, JSON outbounds, Clash proxies) parsed
the blob again per link — once to find the client by email and once for
inbound-level fields like encryption or method. At 500k clients in one
inbound that was 13s per raw fetch and 8.5s per JSON fetch; at 100k,
2.6s/1.7s. After this change both cost ~70ms at 100k.

matchingClients now resolves through the indexed clients/client_inbounds
tables (ListForInboundBySubId, ordered by clients.id like ListForInbound
— the same source the running Xray users are built from), and the
per-request SubService carries two caches: clientsByInbound, primed by
matchingClients/inboundLinks so clientForLink resolves a client without
parsing settings (with the old full-parse as fallback, which also fixes
the export-all-links path that re-parsed the blob once per client), and
settingsByInbound, a once-per-request shallow decode that skips
materializing the clients array entirely. The ClientStats preload is
replaced by loading only the subscriber's traffic rows (indexed
clients.sub_id); statsForClient's per-email DB fallback (#5567) covers
any miss, and the case-insensitive email dedupe keeps the #5134
guarantee for case-differing duplicate rows.
MHSanaei 1 өдөр өмнө
parent
commit
7c12700c7d

+ 3 - 6
internal/sub/clash_service.go

@@ -240,8 +240,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
 	case model.VLESS:
 		proxy["type"] = "vless"
 		proxy["uuid"] = applyVlessRoute(client.ID, hostVlessRoute(ep))
-		var inboundSettings map[string]any
-		_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
+		inboundSettings := subReq.linkSettings(inbound)
 		streamSecurity, _ := stream["security"].(string)
 		if client.Flow != "" && vlessFlowAllowed(network, streamSecurity, inboundSettings) {
 			proxy["flow"] = client.Flow
@@ -258,8 +257,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
 	case model.Shadowsocks:
 		proxy["type"] = "ss"
 		proxy["password"] = client.Password
-		var inboundSettings map[string]any
-		_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
+		inboundSettings := subReq.linkSettings(inbound)
 		method, _ := inboundSettings["method"].(string)
 		if method == "" {
 			return nil
@@ -288,8 +286,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
 // helpers prune fields (like `allowInsecure` / the salamander obfs
 // block) that the hysteria proxy wants preserved.
 func (s *SubClashService) buildHysteriaProxy(subReq *SubService, inbound *model.Inbound, client model.Client, ep map[string]any) map[string]any {
-	var inboundSettings map[string]any
-	_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
+	inboundSettings := subReq.linkSettings(inbound)
 
 	proxyType := "hysteria2"
 	authKey := "password"

+ 8 - 10
internal/sub/json_service.go

@@ -212,11 +212,11 @@ func (s *SubJsonService) getConfig(subReq *SubService, inbound *model.Inbound, c
 		case "vless":
 			vc := client
 			vc.ID = applyVlessRoute(client.ID, hostVlessRoute(extPrxy))
-			newOutbounds = append(newOutbounds, s.genVless(inbound, streamSettings, vc, jsonMux(mux, hostMux)))
+			newOutbounds = append(newOutbounds, s.genVless(subReq, inbound, streamSettings, vc, jsonMux(mux, hostMux)))
 		case "trojan", "shadowsocks":
-			newOutbounds = append(newOutbounds, s.genServer(inbound, streamSettings, client, jsonMux(mux, hostMux)))
+			newOutbounds = append(newOutbounds, s.genServer(subReq, inbound, streamSettings, client, jsonMux(mux, hostMux)))
 		case "hysteria":
-			newOutbounds = append(newOutbounds, s.genHy(inbound, newStream, client, jsonMux(mux, hostMux)))
+			newOutbounds = append(newOutbounds, s.genHy(subReq, inbound, newStream, client, jsonMux(mux, hostMux)))
 		}
 
 		newOutbounds = append(newOutbounds, s.defaultOutbounds...)
@@ -393,7 +393,7 @@ func (s *SubJsonService) genVnext(inbound *model.Inbound, streamSettings json_ut
 	return result
 }
 
-func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
+func (s *SubJsonService) genVless(subReq *SubService, inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
 	outbound := Outbound{}
 	outbound.Protocol = string(inbound.Protocol)
 	outbound.Tag = "proxy"
@@ -403,8 +403,7 @@ func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_ut
 	outbound.StreamSettings = streamSettings
 
 	// Add encryption for VLESS outbound from inbound settings
-	var inboundSettings map[string]any
-	_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
+	inboundSettings := subReq.linkSettings(inbound)
 	encryption, _ := inboundSettings["encryption"].(string)
 
 	settings := map[string]any{
@@ -422,7 +421,7 @@ func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_ut
 	return result
 }
 
-func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
+func (s *SubJsonService) genServer(subReq *SubService, inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
 	outbound := Outbound{}
 
 	serverData := make([]ServerSetting, 1)
@@ -434,8 +433,7 @@ func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_u
 	}
 
 	if inbound.Protocol == model.Shadowsocks {
-		var inboundSettings map[string]any
-		_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
+		inboundSettings := subReq.linkSettings(inbound)
 		method, _ := inboundSettings["method"].(string)
 		serverData[0].Method = method
 
@@ -475,7 +473,7 @@ func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_u
 	return result
 }
 
-func (s *SubJsonService) genHy(inbound *model.Inbound, newStream map[string]any, client model.Client, mux string) json_util.RawMessage {
+func (s *SubJsonService) genHy(subReq *SubService, inbound *model.Inbound, newStream map[string]any, client model.Client, mux string) json_util.RawMessage {
 	outbound := Outbound{}
 
 	outbound.Protocol = string(inbound.Protocol)

+ 5 - 5
internal/sub/json_service_test.go

@@ -122,7 +122,7 @@ func TestSubJsonServiceVlessFlattened(t *testing.T) {
 	inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
 	client := model.Client{ID: "uuid-1", Flow: "xtls-rprx-vision"}
 
-	settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genVless(inbound, nil, client, ""))
+	settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genVless(&SubService{}, inbound, nil, client, ""))
 	if _, ok := settings["vnext"]; ok {
 		t.Fatal("vless outbound must not use vnext")
 	}
@@ -151,7 +151,7 @@ func TestSubJsonServiceServerUsesServersArray(t *testing.T) {
 	trojan := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.Trojan, Settings: `{}`}
 	client := model.Client{Password: "p4ss"}
 
-	settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(trojan, nil, client, ""))
+	settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(&SubService{}, trojan, nil, client, ""))
 	server := firstServer(settings)
 	if server == nil {
 		t.Fatalf("trojan outbound must use a servers array, got: %#v", settings)
@@ -164,7 +164,7 @@ func TestSubJsonServiceServerUsesServersArray(t *testing.T) {
 	}
 
 	ss := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.Shadowsocks, Settings: `{"method":"aes-256-gcm"}`}
-	ssSettings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(ss, nil, client, ""))
+	ssSettings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(&SubService{}, ss, nil, client, ""))
 	ssServer := firstServer(ssSettings)
 	if ssServer == nil {
 		t.Fatalf("shadowsocks outbound must use a servers array, got: %#v", ssSettings)
@@ -194,7 +194,7 @@ func TestSubJsonServiceXmuxSuppressesGlobalMux(t *testing.T) {
 	inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
 	client := model.Client{ID: "uuid-1"}
 
-	raw := svc.genVless(inbound, streamSettings, client, mux)
+	raw := svc.genVless(&SubService{}, inbound, streamSettings, client, mux)
 	var ob map[string]any
 	if err := json.Unmarshal(raw, &ob); err != nil {
 		t.Fatalf("unmarshal outbound: %v", err)
@@ -240,7 +240,7 @@ func TestSubJsonServiceGlobalMuxWhenNoXmux(t *testing.T) {
 	inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
 	client := model.Client{ID: "uuid-1"}
 
-	raw := svc.genVless(inbound, streamSettings, client, mux)
+	raw := svc.genVless(&SubService{}, inbound, streamSettings, client, mux)
 	var ob map[string]any
 	if err := json.Unmarshal(raw, &ob); err != nil {
 		t.Fatalf("unmarshal outbound: %v", err)

+ 4 - 4
internal/sub/mutation_audit_test.go

@@ -67,11 +67,11 @@ func TestSubJsonService_MuxAttachedWhenConfigured(t *testing.T) {
 		protocol model.Protocol
 	}{
 		{"vmess mux", NewSubJsonService(mux, "", "", nil).genVnext(&model.Inbound{Protocol: model.VMESS, Settings: `{}`}, nil, client, mux), true, model.VMESS},
-		{"vless mux", NewSubJsonService(mux, "", "", nil).genVless(&model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, mux), true, model.VLESS},
-		{"server mux", NewSubJsonService(mux, "", "", nil).genServer(&model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, mux), true, model.Trojan},
+		{"vless mux", NewSubJsonService(mux, "", "", nil).genVless(&SubService{}, &model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, mux), true, model.VLESS},
+		{"server mux", NewSubJsonService(mux, "", "", nil).genServer(&SubService{}, &model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, mux), true, model.Trojan},
 		{"vmess no mux", NewSubJsonService("", "", "", nil).genVnext(&model.Inbound{Protocol: model.VMESS, Settings: `{}`}, nil, client, ""), false, model.VMESS},
-		{"vless no mux", NewSubJsonService("", "", "", nil).genVless(&model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, ""), false, model.VLESS},
-		{"server no mux", NewSubJsonService("", "", "", nil).genServer(&model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, ""), false, model.Trojan},
+		{"vless no mux", NewSubJsonService("", "", "", nil).genVless(&SubService{}, &model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, ""), false, model.VLESS},
+		{"server no mux", NewSubJsonService("", "", "", nil).genServer(&SubService{}, &model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, ""), false, model.Trojan},
 	}
 	for _, tc := range cases {
 		t.Run(tc.name, func(t *testing.T) {

+ 2 - 5
internal/sub/remark_vars.go

@@ -459,11 +459,8 @@ func (s *SubService) statsForClient(inbound *model.Inbound, client model.Client)
 // needed when a global remark template references client-only tokens. Falls back
 // to an email-only client if not found.
 func (s *SubService) lookupClient(inbound *model.Inbound, email string) model.Client {
-	clients, _ := s.inboundService.GetClients(inbound)
-	for _, c := range clients {
-		if c.Email == email {
-			return c
-		}
+	if c, ok := s.clientForLink(inbound, email); ok {
+		return c
 	}
 	return model.Client{Email: email}
 }

+ 171 - 65
internal/sub/service.go

@@ -54,6 +54,17 @@ type SubService struct {
 	// doesn't own its row (multi-inbound subscriptions). Filled in
 	// getInboundsBySubId; reset per request in PrepareForRequest.
 	statsByEmail map[string]xray.ClientTraffic
+	// clientsByInbound caches clients resolved for this request keyed by
+	// inbound id then email, so the per-protocol link generators look a client
+	// up without re-parsing the inbound's settings JSON per link.
+	// fullyPrimedInbounds marks inbounds whose complete client list is cached
+	// (a miss there is authoritative). Reset per request in PrepareForRequest.
+	clientsByInbound    map[int]map[string]model.Client
+	fullyPrimedInbounds map[int]bool
+	// settingsByInbound caches each inbound's settings decoded once per request
+	// with the clients array left out; generators read only inbound-level
+	// fields (encryption, method, version, …) from it.
+	settingsByInbound map[int]map[string]any
 }
 
 // NewSubService creates a new subscription service with the given configuration.
@@ -86,10 +97,96 @@ func (s *SubService) PrepareForRequest(host string) {
 	s.address = host
 	s.usageShown = map[string]bool{}
 	s.statsByEmail = map[string]xray.ClientTraffic{}
+	s.clientsByInbound = map[int]map[string]model.Client{}
+	s.fullyPrimedInbounds = map[int]bool{}
+	s.settingsByInbound = map[int]map[string]any{}
 	s.loadNodes()
 	s.loadRemarkSettings()
 }
 
+// primeLinkClients caches clients (first occurrence per email, matching the
+// old settings-JSON iteration order) so clientForLink resolves them without a
+// parse. complete marks the inbound's whole client list as cached.
+func (s *SubService) primeLinkClients(inboundId int, clients []model.Client, complete bool) {
+	if inboundId <= 0 {
+		return
+	}
+	if s.clientsByInbound == nil {
+		s.clientsByInbound = map[int]map[string]model.Client{}
+	}
+	m := s.clientsByInbound[inboundId]
+	if m == nil {
+		m = make(map[string]model.Client, len(clients))
+		s.clientsByInbound[inboundId] = m
+	}
+	for _, c := range clients {
+		if _, exists := m[c.Email]; !exists {
+			m[c.Email] = c
+		}
+	}
+	if complete {
+		if s.fullyPrimedInbounds == nil {
+			s.fullyPrimedInbounds = map[int]bool{}
+		}
+		s.fullyPrimedInbounds[inboundId] = true
+	}
+}
+
+// clientForLink resolves one client of an inbound by email for link
+// generation: from the per-request cache when primed, otherwise by parsing
+// the settings JSON once and caching every client from it.
+func (s *SubService) clientForLink(inbound *model.Inbound, email string) (model.Client, bool) {
+	if m, ok := s.clientsByInbound[inbound.Id]; ok {
+		if c, hit := m[email]; hit {
+			return c, true
+		}
+		if s.fullyPrimedInbounds[inbound.Id] {
+			return model.Client{}, false
+		}
+	}
+	clients, err := s.inboundService.GetClients(inbound)
+	if err != nil {
+		return model.Client{}, false
+	}
+	s.primeLinkClients(inbound.Id, clients, true)
+	for i := range clients {
+		if clients[i].Email == email {
+			return clients[i], true
+		}
+	}
+	return model.Client{}, false
+}
+
+// linkSettings returns the inbound's settings decoded once per request with
+// the clients array left out — the link generators read only inbound-level
+// fields from it and resolve clients via clientForLink. The shallow
+// RawMessage pass skips materializing a huge clients array entirely.
+func (s *SubService) linkSettings(inbound *model.Inbound) map[string]any {
+	if inbound.Id > 0 {
+		if cached, ok := s.settingsByInbound[inbound.Id]; ok {
+			return cached
+		}
+	}
+	shallow := map[string]json.RawMessage{}
+	_ = json.Unmarshal([]byte(inbound.Settings), &shallow)
+	out := make(map[string]any, len(shallow))
+	for key, raw := range shallow {
+		if key == "clients" {
+			continue
+		}
+		var value any
+		_ = json.Unmarshal(raw, &value)
+		out[key] = value
+	}
+	if inbound.Id > 0 {
+		if s.settingsByInbound == nil {
+			s.settingsByInbound = map[int]map[string]any{}
+		}
+		s.settingsByInbound[inbound.Id] = out
+	}
+	return out
+}
+
 // loadRemarkSettings populates the per-request remark formatting state so
 // every subscription format — raw, JSON, Clash — renders remarks the same way
 // (the date formatter reads datepicker). Loading it only in getSubs left
@@ -144,25 +241,23 @@ func listenIsInternalOnly(listen string) bool {
 }
 
 // matchingClients returns the inbound's clients whose SubID equals subId,
-// deduplicated by email. settings.clients can accumulate duplicate entries
-// for the same client (multi-node sync/import drift, old DBs): SyncInbound
-// dedupes the normalized client_inbounds rows on write but never rewrites
-// the legacy JSON, and the subscription builders iterate that JSON — so
-// without this guard every duplicate became a duplicate profile in the
-// output (#5134). Link generation keys purely on (inbound, email), so
-// same-email entries are pure duplicates and dropping them is lossless.
+// resolved from the normalized clients/client_inbounds tables (both filter
+// columns indexed) instead of parsing the settings JSON — at large client
+// counts that parse made every subscription fetch cost seconds. The
+// case-insensitive email dedupe stays as cheap insurance even though
+// clients.email is unique, preserving the #5134 guarantee that duplicate
+// settings entries never fan out into duplicate profiles. Resolved clients
+// are primed into the per-request cache so the link generators don't parse
+// settings either.
 func (s *SubService) matchingClients(inbound *model.Inbound, subId string) []model.Client {
-	clients, err := s.inboundService.GetClients(inbound)
+	clients, err := s.inboundService.GetClientsBySubId(inbound.Id, subId)
 	if err != nil {
-		logger.Error("SubService - GetClients: Unable to get clients from inbound")
+		logger.Error("SubService - GetClientsBySubId: Unable to get clients from inbound")
 		return nil
 	}
 	var out []model.Client
 	seen := make(map[string]struct{}, len(clients))
 	for _, client := range clients {
-		if client.SubID != subId {
-			continue
-		}
 		key := strings.ToLower(client.Email)
 		if _, dup := seen[key]; dup {
 			continue
@@ -170,6 +265,7 @@ func (s *SubService) matchingClients(inbound *model.Inbound, subId string) []mod
 		seen[key] = struct{}{}
 		out = append(out, client)
 	}
+	s.primeLinkClients(inbound.Id, out, false)
 	return out
 }
 
@@ -253,6 +349,7 @@ func (s *SubService) inboundLinks(inbound *model.Inbound) []string {
 	if err != nil {
 		return nil
 	}
+	s.primeLinkClients(inbound.Id, clients, true)
 	s.projectThroughFallbackMaster(inbound)
 	hostEps := s.hostEndpoints(inbound, "raw")
 	var out []string
@@ -362,7 +459,7 @@ func subscriptionExpiryFromClient(nowMs, expiryTime int64) int64 {
 func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error) {
 	db := database.GetDB()
 	var inbounds []*model.Inbound
-	err := db.Model(model.Inbound{}).Preload("ClientStats").Where(`id in (
+	err := db.Model(model.Inbound{}).Where(`id in (
 		SELECT DISTINCT inbounds.id
 		FROM inbounds
 		JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id
@@ -374,19 +471,34 @@ func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error)
 	if err != nil {
 		return nil, err
 	}
-	s.indexStatsByEmail(inbounds)
+	s.indexStatsBySubId(subId)
 	return inbounds, nil
 }
 
-// indexStatsByEmail records every loaded inbound's client traffic rows keyed by
-// email so statsForClient can resolve a client's usage even on an inbound that
-// doesn't own its (globally unique) client_traffics row. See statsByEmail.
-func (s *SubService) indexStatsByEmail(inbounds []*model.Inbound) {
+// indexStatsBySubId loads the traffic rows for just this subscriber's clients
+// into statsByEmail so statsForClient can resolve a client's usage on any of
+// its inbounds. It replaces preloading every matched inbound's ClientStats,
+// which read the entire client_traffics table on every subscription fetch of
+// a large inbound; statsForClient's per-email DB fallback covers any miss.
+func (s *SubService) indexStatsBySubId(subId string) {
 	if s.statsByEmail == nil {
 		s.statsByEmail = map[string]xray.ClientTraffic{}
 	}
-	for _, inbound := range inbounds {
-		for _, st := range inbound.ClientStats {
+	db := database.GetDB()
+	var emails []string
+	if err := db.Model(&model.ClientRecord{}).Where("sub_id = ?", subId).Pluck("email", &emails).Error; err != nil {
+		logger.Error("SubService - indexStatsBySubId: load emails:", err)
+		return
+	}
+	const chunk = 400
+	for lo := 0; lo < len(emails); lo += chunk {
+		hi := min(lo+chunk, len(emails))
+		var rows []xray.ClientTraffic
+		if err := db.Where("email IN ?", emails[lo:hi]).Find(&rows).Error; err != nil {
+			logger.Error("SubService - indexStatsBySubId: load traffics:", err)
+			return
+		}
+		for _, st := range rows {
 			s.statsByEmail[st.Email] = st
 		}
 	}
@@ -516,21 +628,14 @@ func (s *SubService) genWireguardLink(inbound *model.Inbound, email string) stri
 	if inbound.Protocol != model.WireGuard {
 		return ""
 	}
-	settings := map[string]any{}
-	_ = json.Unmarshal([]byte(inbound.Settings), &settings)
+	settings := s.linkSettings(inbound)
 	secretKey, _ := settings["secretKey"].(string)
 
-	clients, _ := s.inboundService.GetClients(inbound)
-	var client *model.Client
-	for i := range clients {
-		if clients[i].Email == email {
-			client = &clients[i]
-			break
-		}
-	}
-	if client == nil || client.PrivateKey == "" {
+	resolved, ok := s.clientForLink(inbound, email)
+	if !ok || resolved.PrivateKey == "" {
 		return ""
 	}
+	client := &resolved
 
 	link := fmt.Sprintf("wireguard://%s@%s", encodeUserinfo(client.PrivateKey), joinHostPort(s.resolveInboundAddress(inbound), inbound.Port))
 	params := make(map[string]string)
@@ -607,10 +712,12 @@ func (s *SubService) genVmessLink(inbound *model.Inbound, email string) string {
 		applyVmessTLSParams(stream, obj)
 	}
 
-	clients, _ := s.inboundService.GetClients(inbound)
-	clientIndex := findClientIndex(clients, email)
-	obj["id"] = clients[clientIndex].ID
-	obj["scy"] = clients[clientIndex].Security
+	client, ok := s.clientForLink(inbound, email)
+	if !ok {
+		return ""
+	}
+	obj["id"] = client.ID
+	obj["scy"] = client.Security
 
 	externalProxies, _ := stream["externalProxy"].([]any)
 
@@ -659,17 +766,18 @@ func (s *SubService) genVlessLink(inbound *model.Inbound, email string) string {
 	}
 	address := s.resolveInboundAddress(inbound)
 	stream := unmarshalStreamSettings(inbound.StreamSettings)
-	clients, _ := s.inboundService.GetClients(inbound)
-	clientIndex := findClientIndex(clients, email)
-	uuid := clients[clientIndex].ID
+	client, ok := s.clientForLink(inbound, email)
+	if !ok {
+		return ""
+	}
+	uuid := client.ID
 	port := inbound.Port
 	streamNetwork := stream["network"].(string)
 	params := make(map[string]string)
 	params["type"] = streamNetwork
 
 	// Add encryption parameter for VLESS from inbound settings
-	var settings map[string]any
-	_ = json.Unmarshal([]byte(inbound.Settings), &settings)
+	settings := s.linkSettings(inbound)
 	if encryption, ok := settings["encryption"].(string); ok {
 		params["encryption"] = encryption
 	}
@@ -683,12 +791,12 @@ func (s *SubService) genVlessLink(inbound *model.Inbound, email string) string {
 	case "tls":
 		applyShareTLSParams(stream, params)
 	case "reality":
-		applyShareRealityParams(stream, params, subKey(clients[clientIndex]))
+		applyShareRealityParams(stream, params, subKey(client))
 	default:
 		params["security"] = "none"
 	}
-	if len(clients[clientIndex].Flow) > 0 && vlessFlowAllowed(streamNetwork, security, settings) {
-		params["flow"] = clients[clientIndex].Flow
+	if len(client.Flow) > 0 && vlessFlowAllowed(streamNetwork, security, settings) {
+		params["flow"] = client.Flow
 	}
 
 	externalProxies, _ := stream["externalProxy"].([]any)
@@ -717,9 +825,11 @@ func (s *SubService) genTrojanLink(inbound *model.Inbound, email string) string
 	}
 	address := s.resolveInboundAddress(inbound)
 	stream := unmarshalStreamSettings(inbound.StreamSettings)
-	clients, _ := s.inboundService.GetClients(inbound)
-	clientIndex := findClientIndex(clients, email)
-	password := encodeUserinfo(clients[clientIndex].Password)
+	client, ok := s.clientForLink(inbound, email)
+	if !ok {
+		return ""
+	}
+	password := encodeUserinfo(client.Password)
 	port := inbound.Port
 	streamNetwork := stream["network"].(string)
 	params := make(map[string]string)
@@ -734,9 +844,9 @@ func (s *SubService) genTrojanLink(inbound *model.Inbound, email string) string
 	case "tls":
 		applyShareTLSParams(stream, params)
 	case "reality":
-		applyShareRealityParams(stream, params, subKey(clients[clientIndex]))
-		if streamNetwork == "tcp" && len(clients[clientIndex].Flow) > 0 {
-			params["flow"] = clients[clientIndex].Flow
+		applyShareRealityParams(stream, params, subKey(client))
+		if streamNetwork == "tcp" && len(client.Flow) > 0 {
+			params["flow"] = client.Flow
 		}
 	default:
 		params["security"] = "none"
@@ -788,13 +898,14 @@ func (s *SubService) genShadowsocksLink(inbound *model.Inbound, email string) st
 	}
 	address := s.resolveInboundAddress(inbound)
 	stream := unmarshalStreamSettings(inbound.StreamSettings)
-	clients, _ := s.inboundService.GetClients(inbound)
+	client, ok := s.clientForLink(inbound, email)
+	if !ok {
+		return ""
+	}
 
-	var settings map[string]any
-	_ = json.Unmarshal([]byte(inbound.Settings), &settings)
+	settings := s.linkSettings(inbound)
 	inboundPassword := settings["password"].(string)
 	method := settings["method"].(string)
-	clientIndex := findClientIndex(clients, email)
 	streamNetwork := stream["network"].(string)
 	params := make(map[string]string)
 	params["type"] = streamNetwork
@@ -828,9 +939,9 @@ func (s *SubService) genShadowsocksLink(inbound *model.Inbound, email string) st
 		userInfo = fmt.Sprintf("%s:%s:%s",
 			url.QueryEscape(method),
 			url.QueryEscape(inboundPassword),
-			url.QueryEscape(clients[clientIndex].Password))
+			url.QueryEscape(client.Password))
 	} else {
-		userInfo = base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", method, clients[clientIndex].Password))
+		userInfo = base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", method, client.Password))
 	}
 
 	externalProxies, _ := stream["externalProxy"].([]any)
@@ -861,15 +972,11 @@ func (s *SubService) genHysteriaLink(inbound *model.Inbound, email string) strin
 	}
 	var stream map[string]any
 	_ = json.Unmarshal([]byte(inbound.StreamSettings), &stream)
-	clients, _ := s.inboundService.GetClients(inbound)
-	clientIndex := -1
-	for i, client := range clients {
-		if client.Email == email {
-			clientIndex = i
-			break
-		}
+	client, ok := s.clientForLink(inbound, email)
+	if !ok {
+		return ""
 	}
-	auth := encodeUserinfo(clients[clientIndex].Auth)
+	auth := encodeUserinfo(client.Auth)
 	params := make(map[string]string)
 
 	params["security"] = "tls"
@@ -928,8 +1035,7 @@ func (s *SubService) genHysteriaLink(inbound *model.Inbound, email string) strin
 		}
 	}
 
-	var settings map[string]any
-	_ = json.Unmarshal([]byte(inbound.Settings), &settings)
+	settings := s.linkSettings(inbound)
 	version, _ := settings["version"].(float64)
 	protocol := "hysteria2"
 	if int(version) == 1 {

+ 26 - 10
internal/sub/service_dedup_test.go

@@ -69,19 +69,35 @@ func TestGetSubs_DuplicateSettingsClients_Deduped(t *testing.T) {
 	}
 }
 
-// TestMatchingClients_DedupsCaseInsensitiveEmail pins the dedup KEY, not just the count:
-// the two entries differ only by email case, so dropping strings.ToLower (or keying on
-// another field) yields two clients. The byte-identical dupes above can't catch that.
+// TestMatchingClients_DedupsCaseInsensitiveEmail pins the dedup KEY, not just the count.
+// clients.email is unique but case-sensitively so: two rows differing only by email case
+// can coexist (import drift), and dropping strings.ToLower (or keying on another field)
+// would emit both. The first row by id must win, matching the old settings-JSON order.
 func TestMatchingClients_DedupsCaseInsensitiveEmail(t *testing.T) {
+	dbDir := t.TempDir()
+	t.Setenv("XUI_DB_FOLDER", dbDir)
+	if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
+		t.Fatalf("InitDB: %v", err)
+	}
+	t.Cleanup(func() { _ = database.CloseDB() })
+
 	const subId = "s1"
 	const uuid = "11111111-2222-4333-8444-555555555555"
-	ib := &model.Inbound{
-		Protocol: model.VLESS,
-		Settings: `{"clients":[
-			{"id":"` + uuid + `","email":"[email protected]","subId":"` + subId + `","enable":true},
-			{"id":"` + uuid + `","email":"[email protected]","subId":"` + subId + `","enable":true}
-		]}`,
+	db := database.GetDB()
+	ib := &model.Inbound{Protocol: model.VLESS, Enable: true, Port: 42002, Tag: "dedup-ci", Settings: `{"clients":[]}`}
+	if err := db.Create(ib).Error; err != nil {
+		t.Fatalf("seed inbound: %v", err)
+	}
+	for _, email := range []string{"[email protected]", "[email protected]"} {
+		c := &model.ClientRecord{Email: email, SubID: subId, UUID: uuid, Enable: true}
+		if err := db.Create(c).Error; err != nil {
+			t.Fatalf("seed client %q: %v", email, err)
+		}
+		if err := db.Create(&model.ClientInbound{ClientId: c.Id, InboundId: ib.Id}).Error; err != nil {
+			t.Fatalf("seed client_inbound %q: %v", email, err)
+		}
 	}
+
 	s := &SubService{}
 	got := s.matchingClients(ib, subId)
 	if len(got) != 1 {
@@ -90,7 +106,7 @@ func TestMatchingClients_DedupsCaseInsensitiveEmail(t *testing.T) {
 	if got[0].Email != "[email protected]" {
 		t.Fatalf("first occurrence must be kept, got %q", got[0].Email)
 	}
-	// A wrong subId must still be excluded (guards the subId filter at service.go:127).
+	// A wrong subId must still be excluded (guards the SQL subId filter).
 	if other := s.matchingClients(ib, "nope"); len(other) != 0 {
 		t.Fatalf("non-matching subId must yield 0 clients, got %d", len(other))
 	}

+ 31 - 0
internal/web/service/client_link.go

@@ -197,3 +197,34 @@ func (s *ClientService) ListForInbound(tx *gorm.DB, inboundId int) ([]model.Clie
 	}
 	return out, nil
 }
+
+// ListForInboundBySubId is ListForInbound narrowed to one subscription id —
+// both filter columns are indexed, so the subscription server resolves a
+// subscriber's clients without touching the inbound's settings JSON.
+func (s *ClientService) ListForInboundBySubId(tx *gorm.DB, inboundId int, subId string) ([]model.Client, error) {
+	if tx == nil {
+		tx = database.GetDB()
+	}
+	type joinedRow struct {
+		model.ClientRecord
+		FlowOverride string
+	}
+	var rows []joinedRow
+	err := tx.Table("clients").
+		Select("clients.*, client_inbounds.flow_override AS flow_override").
+		Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
+		Where("client_inbounds.inbound_id = ? AND clients.sub_id = ?", inboundId, subId).
+		Order("clients.id ASC").
+		Find(&rows).Error
+	if err != nil {
+		return nil, err
+	}
+
+	out := make([]model.Client, 0, len(rows))
+	for i := range rows {
+		c := rows[i].ToClient()
+		c.Flow = rows[i].FlowOverride
+		out = append(out, *c)
+	}
+	return out, nil
+}

+ 7 - 0
internal/web/service/inbound.go

@@ -409,6 +409,13 @@ func (s *InboundService) GetClients(inbound *model.Inbound) ([]model.Client, err
 	return clients, nil
 }
 
+// GetClientsBySubId returns the inbound's clients with the given subscription
+// id, resolved from the normalized clients tables (the same source the running
+// Xray users are built from) instead of parsing the settings JSON blob.
+func (s *InboundService) GetClientsBySubId(inboundId int, subId string) ([]model.Client, error) {
+	return s.clientService.ListForInboundBySubId(nil, inboundId, subId)
+}
+
 func (s *InboundService) GetAllEmails() ([]string, error) {
 	db := database.GetDB()
 	var emails []string