Browse Source

fix(node-sync): keep node baseline while a sibling inbound still reports the email (#5202)

The orphan sweeps in setRemoteTrafficLocked deleted the (node, email)
baseline row unconditionally whenever an email was missing from one
inbound's snapshot stats — even though baselines are keyed per node, not
per inbound. For a client attached to two inbounds of the same node whose
stats the node reports under only one of them, the sweep for the other
inbound deleted the baseline at the end of every sync cycle. Depending on
inbound order, the baseline written earlier in the same transaction was
wiped each time, so the next cycle computed delta against a missing
baseline (zero) and the client's traffic froze permanently.

Scope both sweeps to the union of emails across the whole snapshot: a
baseline is only dropped when the email left the node entirely.
MHSanaei 18 giờ trước cách đây
mục cha
commit
21143a6d72

+ 32 - 1
internal/web/service/inbound_node.go

@@ -262,6 +262,22 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 		}
 	}
 
+	// Union of every email the snapshot still reports, across all inbounds.
+	// The (node, email) baseline rows are keyed per node, not per inbound, so
+	// the sweeps below must only drop one when the email left the node
+	// entirely — an email whose stats moved to (or always lived under) a
+	// sibling inbound still needs its baseline for the sibling's delta
+	// computation (#5202).
+	snapEmailsAll := make(map[string]struct{})
+	for _, snapIb := range snap.Inbounds {
+		if snapIb == nil {
+			continue
+		}
+		for i := range snapIb.ClientStats {
+			snapEmailsAll[snapIb.ClientStats[i].Email] = struct{}{}
+		}
+	}
+
 	tx := db.Begin()
 	committed := false
 	defer func() {
@@ -421,9 +437,17 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			return false, err
 		}
 		if len(goneEmails) > 0 {
+			// Baselines are per (node, email), not per inbound: keep them for
+			// emails the snapshot still reports under a sibling inbound (#5202).
+			baselineGone := make([]string, 0, len(goneEmails))
+			for _, e := range goneEmails {
+				if _, still := snapEmailsAll[e]; !still {
+					baselineGone = append(baselineGone, e)
+				}
+			}
 			// Chunk to avoid SQLite bind var limit when a node has many clients
 			// removed (e.g. after API bulk delete or structural change on node inbound).
-			for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) {
+			for _, batch := range chunkStrings(baselineGone, sqliteMaxVars) {
 				if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
 					Delete(&model.NodeClientTraffic{}).Error; err != nil {
 					return false, err
@@ -554,6 +578,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			if _, kept := snapEmails[k.email]; kept {
 				continue
 			}
+			// Gone from this inbound's stats but still reported by the node under
+			// a sibling inbound: both the shared accumulator row and the (node,
+			// email) baseline must survive, or the sibling's next delta would
+			// compute against nothing and freeze the counter (#5202).
+			if _, still := snapEmailsAll[k.email]; still {
+				continue
+			}
 			if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
 				Delete(&model.NodeClientTraffic{}).Error; err != nil {
 				return false, err

+ 48 - 0
internal/web/service/node_client_traffic_sum_test.go

@@ -226,6 +226,54 @@ func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
 	assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing")
 }
 
+// TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
+// sweep bug behind #5202: the client is attached to two inbounds of the SAME
+// node, the node reports its stats under n1-a only, but the master-side row
+// is owned by n1-b's mirror. The per-email sweep for n1-b must not drop the
+// (node, email) baseline that n1-a's delta computation needs — doing so every
+// cycle froze the client's counter permanently.
+func TestStatsUnderSiblingInbound_KeepsNodeBaseline(t *testing.T) {
+	db := initTrafficTestDB(t)
+	createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "fresh")
+	createNodeInbound(t, db, 1, "n1-b", 41002)
+	svc := &InboundService{}
+
+	const email = "fresh"
+	var ibB model.Inbound
+	if err := db.Where("tag = ?", "n1-b").First(&ibB).Error; err != nil {
+		t.Fatalf("load n1-b: %v", err)
+	}
+	// Master-side row created when the client was added on the panel, owned by
+	// n1-b's mirror (e.g. the client form targeted that inbound).
+	if err := db.Create(&xray.ClientTraffic{InboundId: ibB.Id, Email: email, Enable: true}).Error; err != nil {
+		t.Fatalf("seed master row: %v", err)
+	}
+
+	settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
+	sync := func(up, down int64) {
+		t.Helper()
+		snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{
+			{Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: up, Down: down, Enable: true}}},
+			{Tag: "n1-b", Settings: `{"clients": []}`},
+		}}
+		if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil {
+			t.Fatalf("sync: %v", err)
+		}
+	}
+
+	sync(630, 630)
+	var baselines int64
+	if err := db.Model(&model.NodeClientTraffic{}).Where("node_id = ? AND email = ?", 1, email).Count(&baselines).Error; err != nil {
+		t.Fatalf("count baselines: %v", err)
+	}
+	if baselines != 1 {
+		t.Fatalf("baseline must survive the sibling-inbound sweep, found %d rows", baselines)
+	}
+
+	sync(700, 700)
+	assertUpDown(t, readTraffic(t, db, email), 70, 70, "delta accrues once baseline survives")
+}
+
 func TestDelClientStat_CleansNodeBaselines(t *testing.T) {
 	db := initTrafficTestDB(t)
 	svc := &InboundService{}