Răsfoiți Sursa

feat(web): broadcast delta client stats above a snapshot threshold

Both 5s broadcasters (the local traffic poll and the node traffic sync)
shipped the complete client_traffics table on every cycle while a browser
was connected. At 500k clients that is a 1.7s full-table read plus an
86MB marshal per job per poll — and the hub drops any payload over 10MB
and sends an invalidate the frontend ignores for these message types, so
past ~55k clients all of it was pure waste and the UI got nothing.

Installs at or below 5000 clients (clientStatsSnapshotMaxClients) keep
the exact full-snapshot behavior — it exists because a pure delta feed
left UI rows stale when nothing moved in a cycle (see GetAllClientTraffics)
— and the payload now carries snapshot=true. Above the threshold the jobs
send only this cycle's active rows (the xray poll's active emails, or the
emails online on the synced nodes) with snapshot=false, and scope the
last-online map to those rows; the initial full map still arrives over
REST and the clients page refetches every 5s.

GetActiveClientTraffics gains the overlayGlobalTraffic pass so delta rows
carry the same cross-panel usage as snapshot rows. The node job also
stops reading the full last-online map before the has-clients gate, which
was a wasted full-table read on every tick with no dashboard open.

Frontend: useClients keeps its live summary strictly snapshot-driven
(snapshot=false payloads skip the allClientStats replace and the summary
falls back to the server-computed one); the per-row page merge and the
inbounds-page merges already handle deltas.
MHSanaei 1 zi în urmă
părinte
comite
fc5be5b9e4

+ 2 - 2
frontend/src/hooks/useClients.ts

@@ -551,9 +551,9 @@ export function useClients() {
 
   const applyClientStatsEvent = useCallback((payload: unknown) => {
     if (!payload || typeof payload !== 'object') return;
-    const p = payload as { clients?: ClientStatRow[] };
+    const p = payload as { clients?: ClientStatRow[]; snapshot?: boolean };
     if (!Array.isArray(p.clients) || p.clients.length === 0) return;
-    setAllClientStats(p.clients);
+    if (p.snapshot !== false) setAllClientStats(p.clients);
     const byEmail = new Map<string, ClientTraffic>();
     for (const row of p.clients) {
       if (row && row.email) byEmail.set(row.email, row);

+ 66 - 21
internal/web/job/node_traffic_sync_job.go

@@ -110,6 +110,8 @@ func (j *NodeTrafficSyncJob) Run() {
 
 	sem := make(chan struct{}, nodeTrafficSyncConcurrency)
 	var wg sync.WaitGroup
+	var activeMu sync.Mutex
+	var activeEmails []string
 	for _, n := range nodes {
 		if !n.Enable || n.Status != "online" {
 			continue
@@ -120,7 +122,11 @@ func (j *NodeTrafficSyncJob) Run() {
 		common.GoRecover("node-traffic-sync:"+n.Name, func() {
 			defer wg.Done()
 			defer func() { <-sem }()
-			j.syncOne(mgr, n, doIpSync)
+			if emails := j.syncOne(mgr, n, doIpSync); len(emails) > 0 {
+				activeMu.Lock()
+				activeEmails = append(activeEmails, emails...)
+				activeMu.Unlock()
+			}
 		})
 	}
 	wg.Wait()
@@ -143,14 +149,6 @@ func (j *NodeTrafficSyncJob) Run() {
 
 	j.maybePushGlobals(mgr, nodes)
 
-	lastOnline, err := j.inboundService.GetClientsLastOnline()
-	if err != nil {
-		logger.Warning("node traffic sync: get last-online failed:", err)
-	}
-	if lastOnline == nil {
-		lastOnline = map[string]int64{}
-	}
-
 	// Prune stale local-online entries (no local active emails or inbound tags
 	// to add here — only the local xray poll feeds those) so a stopped local
 	// xray's clients and inbounds still age out between traffic polls.
@@ -164,6 +162,45 @@ func (j *NodeTrafficSyncJob) Run() {
 		return
 	}
 
+	// Same snapshot-vs-delta split as the local traffic job: above the
+	// threshold a full snapshot would be dropped by the hub's payload cap, so
+	// send only the rows for clients online on the synced nodes this tick.
+	snapshot := true
+	if total, countErr := j.inboundService.CountClientTraffics(); countErr != nil {
+		logger.Warning("node traffic sync: count client traffics failed:", countErr)
+	} else if total > clientStatsSnapshotMaxClients {
+		snapshot = false
+	}
+
+	var stats []*xray.ClientTraffic
+	var statsErr error
+	if snapshot {
+		stats, statsErr = j.inboundService.GetAllClientTraffics()
+	} else {
+		stats, statsErr = j.inboundService.GetActiveClientTraffics(activeEmails)
+	}
+	if statsErr != nil {
+		logger.Warning("node traffic sync: get client traffics for websocket failed:", statsErr)
+	}
+
+	var lastOnline map[string]int64
+	if snapshot {
+		var loErr error
+		if lastOnline, loErr = j.inboundService.GetClientsLastOnline(); loErr != nil {
+			logger.Warning("node traffic sync: get last-online failed:", loErr)
+		}
+	} else {
+		lastOnline = make(map[string]int64, len(stats))
+		for _, ct := range stats {
+			if ct != nil {
+				lastOnline[ct.Email] = ct.LastOnline
+			}
+		}
+	}
+	if lastOnline == nil {
+		lastOnline = map[string]int64{}
+	}
+
 	online := j.inboundService.GetOnlineClients()
 	if online == nil {
 		online = []string{}
@@ -181,10 +218,8 @@ func (j *NodeTrafficSyncJob) Run() {
 	trafficPayload["nodeTraffics"] = inboundSpeed
 	websocket.BroadcastTraffic(trafficPayload)
 
-	clientStats := map[string]any{}
-	if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
-		logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
-	} else if len(stats) > 0 {
+	clientStats := map[string]any{"snapshot": snapshot}
+	if len(stats) > 0 {
 		clientStats["clients"] = stats
 	}
 	if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
@@ -192,7 +227,7 @@ func (j *NodeTrafficSyncJob) Run() {
 	} else if len(summary) > 0 {
 		clientStats["inbounds"] = summary
 	}
-	if len(clientStats) > 0 {
+	if len(clientStats) > 1 {
 		websocket.BroadcastClientStats(clientStats)
 	}
 
@@ -318,11 +353,14 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod
 	wg.Wait()
 }
 
-func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
+// syncOne pulls one node's traffic snapshot and merges it. It returns the
+// emails online on that node this tick, feeding the delta broadcast above the
+// snapshot threshold; nil on any failure path.
+func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) []string {
 	rt, err := mgr.RemoteFor(n)
 	if err != nil {
 		logger.Warningf("node traffic sync: remote lookup failed for %s: %v", n.Name, err)
-		return
+		return nil
 	}
 
 	if n.ConfigDirty {
@@ -331,7 +369,7 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
 		reconcileCancel()
 		if reconcileErr != nil {
 			logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr)
-			return
+			return nil
 		}
 		if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
 			logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr)
@@ -346,21 +384,27 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
 	if err != nil {
 		logger.Warningf("node traffic sync: fetch from %s failed: %v", n.Name, err)
 		j.inboundService.ClearNodeOnlineClients(n.Id)
-		return
+		return nil
 	}
 	service.FilterNodeSnapshot(n, snap)
 	_, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
 	changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
 	if err != nil {
 		logger.Warningf("node traffic sync: merge for %s failed: %v", n.Name, err)
-		return
+		return nil
 	}
 	if changed {
 		j.structural.set()
 	}
 
+	active := make([]string, 0, len(snap.OnlineEmails))
+	active = append(active, snap.OnlineEmails...)
+	for _, emails := range snap.OnlineTree {
+		active = append(active, emails...)
+	}
+
 	if !doIpSync {
-		return
+		return active
 	}
 
 	ipCtx, ipCancel := context.WithTimeout(context.Background(), nodeClientIpSyncTimeout)
@@ -378,7 +422,7 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
 	masterIps, err := j.inboundService.GetAllInboundClientIps()
 	if err != nil {
 		logger.Warningf("node traffic sync: load client ips for push to %s failed: %v", n.Name, err)
-		return
+		return active
 	}
 	if len(masterIps) > 0 {
 		if err := rt.PushAllClientIps(ipCtx, masterIps); err != nil {
@@ -406,4 +450,5 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
 			}
 		}
 	}
+	return active
 }

+ 46 - 8
internal/web/job/xray_traffic_job.go

@@ -20,6 +20,14 @@ type XrayTrafficJob struct {
 	outboundService outbound.OutboundService
 }
 
+// clientStatsSnapshotMaxClients caps how many client_traffics rows the job
+// ships as a full websocket snapshot per poll (same spirit as the
+// controller's broadcastInboundsUpdateClientLimit). Above it, a snapshot
+// would blow past the hub's payload cap and be dropped wholesale, so the job
+// broadcasts only this poll's active rows and the UI leans on its 5s REST
+// refetch for the rest.
+const clientStatsSnapshotMaxClients = 5000
+
 // NewXrayTrafficJob creates a new traffic collection job instance.
 func NewXrayTrafficJob() *XrayTrafficJob {
 	return new(XrayTrafficJob)
@@ -116,9 +124,41 @@ func (j *XrayTrafficJob) Run() {
 		return
 	}
 
-	lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
-	if err != nil {
-		logger.Warning("get clients last online failed:", err)
+	// Small installs broadcast the full snapshot (see GetAllClientTraffics for
+	// why deltas alone left UI rows stale). Above the threshold the snapshot
+	// would be dropped by the hub's payload cap anyway, so ship this poll's
+	// active rows instead and scope last-online to them; the initial full map
+	// still arrives over REST.
+	snapshot := true
+	if total, countErr := j.inboundService.CountClientTraffics(); countErr != nil {
+		logger.Warning("count client traffics for websocket failed:", countErr)
+	} else if total > clientStatsSnapshotMaxClients {
+		snapshot = false
+	}
+
+	var stats []*xray.ClientTraffic
+	var statsErr error
+	if snapshot {
+		stats, statsErr = j.inboundService.GetAllClientTraffics()
+	} else {
+		stats, statsErr = j.inboundService.GetActiveClientTraffics(activeEmails)
+	}
+	if statsErr != nil {
+		logger.Warning("get client traffics for websocket failed:", statsErr)
+	}
+
+	var lastOnlineMap map[string]int64
+	if snapshot {
+		if lastOnlineMap, err = j.inboundService.GetClientsLastOnline(); err != nil {
+			logger.Warning("get clients last online failed:", err)
+		}
+	} else {
+		lastOnlineMap = make(map[string]int64, len(stats))
+		for _, ct := range stats {
+			if ct != nil {
+				lastOnlineMap[ct.Email] = ct.LastOnline
+			}
+		}
 	}
 	if lastOnlineMap == nil {
 		lastOnlineMap = make(map[string]int64)
@@ -136,10 +176,8 @@ func (j *XrayTrafficJob) Run() {
 		"lastOnlineMap":  lastOnlineMap,
 	})
 
-	clientStatsPayload := map[string]any{}
-	if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
-		logger.Warning("get all client traffics for websocket failed:", err)
-	} else if len(stats) > 0 {
+	clientStatsPayload := map[string]any{"snapshot": snapshot}
+	if len(stats) > 0 {
 		clientStatsPayload["clients"] = stats
 	}
 	if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
@@ -147,7 +185,7 @@ func (j *XrayTrafficJob) Run() {
 	} else if len(inboundSummary) > 0 {
 		clientStatsPayload["inbounds"] = inboundSummary
 	}
-	if len(clientStatsPayload) > 0 {
+	if len(clientStatsPayload) > 1 {
 		websocket.BroadcastClientStats(clientStatsPayload)
 	}
 

+ 15 - 4
internal/web/service/inbound_traffic.go

@@ -928,14 +928,18 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien
 		}
 		traffics = append(traffics, page...)
 	}
+	overlayGlobalTraffic(db, traffics)
 	return traffics, nil
 }
 
 // GetAllClientTraffics returns the full set of client_traffics rows so the
-// websocket broadcasters can ship a complete snapshot every cycle. The old
-// delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
-// the per-client section whenever no client moved bytes in the cycle or a
-// node sync failed, leaving client rows in the UI stuck at stale numbers.
+// websocket broadcasters can ship a complete snapshot every cycle. A pure
+// delta path silently dropped the per-client section whenever no client moved
+// bytes in the cycle or a node sync failed, leaving client rows in the UI
+// stuck at stale numbers — so small installs broadcast this snapshot, and only
+// above the traffic job's snapshot threshold (where the marshaled snapshot
+// would exceed the hub's payload cap and be dropped wholesale) does the job
+// fall back to active-row deltas.
 func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
 	db := database.GetDB()
 	var traffics []*xray.ClientTraffic
@@ -946,6 +950,13 @@ func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
 	return traffics, nil
 }
 
+func (s *InboundService) CountClientTraffics() (int64, error) {
+	db := database.GetDB()
+	var count int64
+	err := db.Model(xray.ClientTraffic{}).Count(&count).Error
+	return count, err
+}
+
 type InboundTrafficSummary struct {
 	Id     int   `json:"id"`
 	Up     int64 `json:"up"`

+ 5 - 4
internal/web/websocket/notifier.go

@@ -45,10 +45,11 @@ func BroadcastTraffic(traffic any) {
 	}
 }
 
-// BroadcastClientStats broadcasts absolute per-client traffic counters for the
-// clients that had activity in the latest collection window. Use this instead
-// of re-broadcasting the full inbound list — it scales to 10k+ clients because
-// the payload only includes active rows (typically a fraction of total).
+// BroadcastClientStats broadcasts absolute per-client traffic counters. Small
+// installs send the complete row set each cycle (payload key snapshot=true);
+// above the traffic job's snapshot threshold only the rows active in the
+// latest collection window are sent (snapshot=false), which keeps the payload
+// under the hub's cap at any client count.
 func BroadcastClientStats(stats any) {
 	if hub := GetHub(); hub != nil {
 		hub.Broadcast(MessageTypeClientStats, stats)