Browse Source

fix(inbounds): refresh client rows live over websocket

Two bugs combined to leave per-client traffic / remained / all-time
columns stuck at stale numbers while only the inbound-level row and
the online badge refreshed:

1. Backend (xray + node sync traffic jobs) only included the per-client
   array in the client_stats broadcast when activeEmails / touched
   was non-empty. Cycles with no client deltas — or any node sync that
   failed to fetch a snapshot — shipped only the inbound summary, so
   the frontend had nothing to merge for clients. Replaced both code
   paths with a single GetAllClientTraffics() snapshot per cycle; the
   broadcast now always carries the full client list.

2. Frontend mutated dbInbound.clientStats[i] in place. DBInbound is a
   plain class instance (not wrapped in reactive()), so Vue could not
   see the field-level changes and ClientRowTable's statsMap computed
   stayed cached forever. Added a statsVersion tick bumped on every
   merge and read inside statsMap so the computed re-evaluates and the
   template pulls fresh up/down/allTime/expiryTime each push.

Removed the now-dead emailSet helper from node_traffic_sync_job and
the activeEmails filter from xray_traffic_job.
MHSanaei 1 day ago
parent
commit
2551a673c3

+ 5 - 0
frontend/src/pages/inbounds/ClientRowTable.vue

@@ -33,6 +33,7 @@ const props = defineProps({
   isDarkTheme: { type: Boolean, default: false },
   pageSize: { type: Number, default: 0 },
   totalClientCount: { type: Number, default: 0 },
+  statsVersion: { type: Number, default: 0 },
 });
 
 const emit = defineEmits([
@@ -63,7 +64,11 @@ watch([clients, () => props.pageSize], () => {
 });
 
 // === Per-client stats lookup =======================================
+// statsVersion bumps on every ws merge so this computed re-evaluates
+// (DBInbound isn't reactive — the in-place stat mutations alone don't
+// trigger Vue's tracking).
 const statsMap = computed(() => {
+  void props.statsVersion;
   const m = new Map();
   for (const cs of (props.dbInbound.clientStats || [])) m.set(cs.email, cs);
   return m;

+ 3 - 0
frontend/src/pages/inbounds/InboundList.vue

@@ -50,6 +50,7 @@ const props = defineProps({
   // inbound row can render its node name without an extra fetch.
   nodesById: { type: Map, default: () => new Map() },
   hasActiveNode: { type: Boolean, default: false },
+  statsVersion: { type: Number, default: 0 },
 });
 
 const emit = defineEmits([
@@ -468,6 +469,7 @@ function showQrCodeMenu(dbInbound) {
             <ClientRowTable :db-inbound="record" :is-mobile="true" :traffic-diff="trafficDiff" :expire-diff="expireDiff"
               :online-clients="onlineClients" :last-online-map="lastOnlineMap" :is-dark-theme="isDarkTheme"
               :page-size="pageSize" :total-client-count="clientCount[record.id]?.clients || 0"
+              :stats-version="statsVersion"
               @edit-client="(p) => emit('edit-client', p)" @qrcode-client="(p) => emit('qrcode-client', p)"
               @info-client="(p) => emit('info-client', p)"
               @reset-traffic-client="(p) => emit('reset-traffic-client', p)"
@@ -557,6 +559,7 @@ function showQrCodeMenu(dbInbound) {
             :traffic-diff="trafficDiff" :expire-diff="expireDiff" :online-clients="onlineClients"
             :last-online-map="lastOnlineMap" :is-dark-theme="isDarkTheme" :page-size="pageSize"
             :total-client-count="clientCount[record.id]?.clients || 0"
+            :stats-version="statsVersion"
             @edit-client="(p) => emit('edit-client', p)"
             @qrcode-client="(p) => emit('qrcode-client', p)" @info-client="(p) => emit('info-client', p)"
             @reset-traffic-client="(p) => emit('reset-traffic-client', p)"

+ 2 - 0
frontend/src/pages/inbounds/InboundsPage.vue

@@ -45,6 +45,7 @@ const {
   ipLimitEnable,
   remarkModel,
   lastOnlineMap,
+  statsVersion,
   refresh,
   fetchDefaultSettings,
   applyTrafficEvent,
@@ -648,6 +649,7 @@ function onRowAction({ key, dbInbound }) {
                   :last-online-map="lastOnlineMap" :is-dark-theme="themeState.isDark" :expire-diff="expireDiff"
                   :traffic-diff="trafficDiff" :page-size="pageSize" :is-mobile="isMobile"
                   :sub-enable="subSettings.enable" :nodes-by-id="nodesById" :has-active-node="hasActiveNode"
+                  :stats-version="statsVersion"
                   @refresh="refresh"
                   @add-inbound="onAddInbound" @general-action="onGeneralAction" @row-action="onRowAction"
                   @edit-client="onEditClient" @qrcode-client="onQrcodeClient" @info-client="onInfoClient"

+ 10 - 3
frontend/src/pages/inbounds/useInbounds.js

@@ -23,6 +23,11 @@ export function useInbounds() {
   const clientCount = ref({});
   const onlineClients = ref([]);
   const lastOnlineMap = ref({});
+  // Bumps on every client_stats merge so the per-inbound ClientRowTable
+  // child can re-render. DBInbound is a plain class instance, not reactive,
+  // so the in-place mutations on its clientStats array are invisible to
+  // Vue's tracking unless something else (this tick) signals the change.
+  const statsVersion = ref(0);
 
   // Default-settings sidecar fields the table needs for color/expiry math.
   const expireDiff = ref(0);
@@ -173,9 +178,9 @@ export function useInbounds() {
     rebuildClientCount();
   }
 
-  // The client_stats payload carries absolute traffic counters for the
-  // clients that had activity in the latest window plus per-inbound
-  // totals. Both are absolute (not deltas), so we overwrite in place.
+  // The client_stats payload carries absolute traffic counters for every
+  // client + per-inbound totals (full snapshot, not deltas). Both are
+  // overwritten in place.
   function applyClientStatsEvent(payload) {
     if (!payload || typeof payload !== 'object') return;
     let touched = false;
@@ -220,6 +225,7 @@ export function useInbounds() {
     }
 
     if (touched) {
+      statsVersion.value++;
       dbInbounds.value = [...dbInbounds.value];
       rebuildClientCount();
     }
@@ -315,6 +321,7 @@ export function useInbounds() {
     clientCount,
     onlineClients,
     lastOnlineMap,
+    statsVersion,
     totals,
     expireDiff,
     trafficDiff,

+ 6 - 51
web/job/node_traffic_sync_job.go

@@ -43,36 +43,6 @@ func (a *atomicBool) takeAndReset() bool {
 	return v
 }
 
-type emailSet struct {
-	mu sync.Mutex
-	m  map[string]struct{}
-}
-
-func newEmailSet() *emailSet { return &emailSet{m: make(map[string]struct{})} }
-
-func (s *emailSet) addAll(emails []string) {
-	if len(emails) == 0 {
-		return
-	}
-	s.mu.Lock()
-	for _, e := range emails {
-		if e != "" {
-			s.m[e] = struct{}{}
-		}
-	}
-	s.mu.Unlock()
-}
-
-func (s *emailSet) slice() []string {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	out := make([]string, 0, len(s.m))
-	for e := range s.m {
-		out = append(out, e)
-	}
-	return out
-}
-
 func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
 	return &NodeTrafficSyncJob{}
 }
@@ -97,7 +67,6 @@ func (j *NodeTrafficSyncJob) Run() {
 		return
 	}
 
-	touched := newEmailSet()
 	sem := make(chan struct{}, nodeTrafficSyncConcurrency)
 	var wg sync.WaitGroup
 	for _, n := range nodes {
@@ -109,7 +78,7 @@ func (j *NodeTrafficSyncJob) Run() {
 		go func(n *model.Node) {
 			defer wg.Done()
 			defer func() { <-sem }()
-			j.syncOne(mgr, n, touched)
+			j.syncOne(mgr, n)
 		}(n)
 	}
 	wg.Wait()
@@ -135,12 +104,10 @@ func (j *NodeTrafficSyncJob) Run() {
 	})
 
 	clientStats := map[string]any{}
-	if emails := touched.slice(); len(emails) > 0 {
-		if stats, err := j.inboundService.GetActiveClientTraffics(emails); err != nil {
-			logger.Warning("node traffic sync: get client traffics for websocket failed:", err)
-		} else if len(stats) > 0 {
-			clientStats["clients"] = stats
-		}
+	if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
+		logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
+	} else if len(stats) > 0 {
+		clientStats["clients"] = stats
 	}
 	if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
 		logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
@@ -156,7 +123,7 @@ func (j *NodeTrafficSyncJob) Run() {
 	}
 }
 
-func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, touched *emailSet) {
+func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
 	ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
 	defer cancel()
 
@@ -179,16 +146,4 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, touche
 	if changed {
 		j.structural.set()
 	}
-	for _, ib := range snap.Inbounds {
-		if ib == nil {
-			continue
-		}
-		emails := make([]string, 0, len(ib.ClientStats))
-		for _, cs := range ib.ClientStats {
-			if cs.Email != "" {
-				emails = append(emails, cs.Email)
-			}
-		}
-		touched.addAll(emails)
-	}
 }

+ 9 - 31
web/job/xray_traffic_job.go

@@ -95,18 +95,16 @@ func (j *XrayTrafficJob) Run() {
 		"lastOnlineMap":  lastOnlineMap,
 	})
 
-	// Compact delta payload: per-client absolute counters for clients active
-	// this cycle, plus inbound-level absolute totals. Frontend applies both
-	// in-place — typical payload ~10–50KB even for 10k+ client deployments.
-	// Replaces the old full-inbound-list broadcast that hit WS size limits
-	// (5–10MB) and forced the frontend into a REST refetch.
+	// Full snapshot every cycle: absolute per-client counters and inbound
+	// totals. Frontend overwrites both in place. The previous delta path
+	// (activeEmails -> GetActiveClientTraffics) silently omitted the
+	// clients array whenever nobody moved bytes in the cycle, leaving the
+	// client rows in the UI stuck at stale traffic/remained/all-time.
 	clientStatsPayload := map[string]any{}
-	if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 {
-		if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil {
-			logger.Warning("get active client traffics for websocket failed:", err)
-		} else if len(stats) > 0 {
-			clientStatsPayload["clients"] = stats
-		}
+	if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
+		logger.Warning("get all client traffics for websocket failed:", err)
+	} else if len(stats) > 0 {
+		clientStatsPayload["clients"] = stats
 	}
 	if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
 		logger.Warning("get inbounds traffic summary for websocket failed:", err)
@@ -126,26 +124,6 @@ func (j *XrayTrafficJob) Run() {
 	}
 }
 
-// activeEmails returns the set of client emails that had non-zero traffic in
-// the current collection window. Idle clients are skipped — no need to push
-// their (unchanged) counters to the frontend.
-func activeEmails(clientTraffics []*xray.ClientTraffic) []string {
-	if len(clientTraffics) == 0 {
-		return nil
-	}
-	emails := make([]string, 0, len(clientTraffics))
-	for _, ct := range clientTraffics {
-		if ct == nil || ct.Email == "" {
-			continue
-		}
-		if ct.Up == 0 && ct.Down == 0 {
-			continue
-		}
-		emails = append(emails, ct.Email)
-	}
-	return emails
-}
-
 func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
 	informURL, err := j.settingService.GetExternalTrafficInformURI()
 	if err != nil {

+ 14 - 0
web/service/inbound.go

@@ -3322,6 +3322,20 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien
 	return traffics, nil
 }
 
+// GetAllClientTraffics returns the full set of client_traffics rows so the
+// websocket broadcasters can ship a complete snapshot every cycle. The old
+// delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
+// the per-client section whenever no client moved bytes in the cycle or a
+// node sync failed, leaving client rows in the UI stuck at stale numbers.
+func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
+	db := database.GetDB()
+	var traffics []*xray.ClientTraffic
+	if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
+		return nil, err
+	}
+	return traffics, nil
+}
+
 type InboundTrafficSummary struct {
 	Id      int   `json:"id"`
 	Up      int64 `json:"up"`