Преглед изворни кода

fix(node): never re-add a node's full counter on reset/restart (#5456, #5476, #5390)

When a node's per-client counter dips below the master's stored baseline
(node reboot, xray restart, or a reset propagated to the node), the delta
accounting clamped delta to the node's whole current counter and re-added it
to the master total — double-counting a client's lifetime usage in a single
sync and often pushing them over quota. Treat a backward-moving counter as a
reset: add 0 and rebaseline to the reported value, so only genuine post-reset
usage accrues.

Resets also now clear the per-node NodeClientTraffic baseline (ResetClient
TrafficByEmail, resetClientTrafficLocked, BulkResetTraffic, resetAllClient
TrafficsLocked), mirroring the delete paths. Without this the node's pre-reset
cumulative — including traffic it had counted but not yet synced — leaks back
onto the master after a reset, which is the 'reset reverts after a while'
report. The next sync then takes the clean delta=0 + rebaseline path regardless
of node state.

Updates TestNodeCounterReset (was _Clamped, now _NoReAdd) to assert rebaseline
instead of re-add, and adds TestCentralResetClearsNodeBaseline_NoLeak.
MHSanaei пре 12 часа
родитељ
комит
e59788bac1

+ 15 - 1
internal/web/service/client_traffic.go

@@ -101,7 +101,15 @@ func (s *ClientService) BulkResetTraffic(inboundSvc *InboundService, emails []st
 				}
 				}
 				affected += int(res.RowsAffected)
 				affected += int(res.RowsAffected)
 			}
 			}
-			return clearGlobalTraffic(tx, cleanEmails...)
+			if err := clearGlobalTraffic(tx, cleanEmails...); err != nil {
+				return err
+			}
+			for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
+				if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
+					return err
+				}
+			}
+			return nil
 		})
 		})
 	})
 	})
 	if err != nil {
 	if err != nil {
@@ -154,6 +162,12 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error {
 			return err
 			return err
 		}
 		}
 
 
+		for _, batch := range chunkStrings(resetEmails, sqlInChunk) {
+			if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
+				return err
+			}
+		}
+
 		inboundWhereText := "id "
 		inboundWhereText := "id "
 		if id == -1 {
 		if id == -1 {
 			inboundWhereText += " > ?"
 			inboundWhereText += " > ?"

+ 2 - 2
internal/web/service/inbound_node.go

@@ -571,10 +571,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			var deltaUp, deltaDown int64
 			var deltaUp, deltaDown int64
 			if seen {
 			if seen {
 				if deltaUp = canon.Up - base.Up; deltaUp < 0 {
 				if deltaUp = canon.Up - base.Up; deltaUp < 0 {
-					deltaUp = canon.Up
+					deltaUp = 0
 				}
 				}
 				if deltaDown = canon.Down - base.Down; deltaDown < 0 {
 				if deltaDown = canon.Down - base.Down; deltaDown < 0 {
-					deltaDown = canon.Down
+					deltaDown = 0
 				}
 				}
 			}
 			}
 
 

+ 8 - 2
internal/web/service/inbound_traffic.go

@@ -506,9 +506,12 @@ func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
 		if err := clearGlobalTraffic(db, clientEmail); err != nil {
 		if err := clearGlobalTraffic(db, clientEmail); err != nil {
 			return err
 			return err
 		}
 		}
-		return db.Model(xray.ClientTraffic{}).
+		if err := db.Model(xray.ClientTraffic{}).
 			Where("email = ?", clientEmail).
 			Where("email = ?", clientEmail).
-			Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
+			Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error; err != nil {
+			return err
+		}
+		return db.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error
 	})
 	})
 }
 }
 
 
@@ -602,6 +605,9 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b
 	if err := clearGlobalTraffic(db, clientEmail); err != nil {
 	if err := clearGlobalTraffic(db, clientEmail); err != nil {
 		return false, err
 		return false, err
 	}
 	}
+	if err := db.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error; err != nil {
+		return false, err
+	}
 
 
 	now := time.Now().UnixMilli()
 	now := time.Now().UnixMilli()
 	_ = db.Model(model.Inbound{}).
 	_ = db.Model(model.Inbound{}).

+ 46 - 3
internal/web/service/node_client_traffic_sum_test.go

@@ -169,7 +169,7 @@ func TestGhostData_NoPhantomTraffic(t *testing.T) {
 	assertUpDown(t, readTraffic(t, db, email), 1024, 2048, "only incremental traffic beyond baseline counts")
 	assertUpDown(t, readTraffic(t, db, email), 1024, 2048, "only incremental traffic beyond baseline counts")
 }
 }
 
 
-func TestNodeCounterReset_Clamped(t *testing.T) {
+func TestNodeCounterReset_NoReAdd(t *testing.T) {
 	db := initTrafficTestDB(t)
 	db := initTrafficTestDB(t)
 	createNodeInbound(t, db, 1, "n1-in", 41001)
 	createNodeInbound(t, db, 1, "n1-in", 41001)
 	svc := &InboundService{}
 	svc := &InboundService{}
@@ -180,13 +180,19 @@ func TestNodeCounterReset_Clamped(t *testing.T) {
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
 	assertUpDown(t, readTraffic(t, db, email), 50, 50, "before node reset")
 	assertUpDown(t, readTraffic(t, db, email), 50, 50, "before node reset")
 
 
-	// Counter resets to 50 (Xray restart). delta=50-950=-900 → clamped → adds 50.
+	// Node reboot drops the counter to 50. delta=50-950=-900 is a counter reset,
+	// not new traffic: add 0 and rebaseline to 50, never re-add the node's full
+	// cumulative counter onto the master total (#5456).
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
 	ct := readTraffic(t, db, email)
 	ct := readTraffic(t, db, email)
 	if ct.Up < 0 || ct.Down < 0 {
 	if ct.Up < 0 || ct.Down < 0 {
 		t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
 		t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
 	}
 	}
-	assertUpDown(t, ct, 100, 100, "after node counter reset (clamped)")
+	assertUpDown(t, ct, 50, 50, "after node counter reset: rebaselined, not re-added")
+
+	// Post-reset accrual resumes from the new baseline: 80-50=30.
+	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 80, Down: 80, Enable: true})
+	assertUpDown(t, readTraffic(t, db, email), 80, 80, "post-reset delta accrues from rebaselined counter")
 }
 }
 
 
 func TestCentralReset_NoReAdd(t *testing.T) {
 func TestCentralReset_NoReAdd(t *testing.T) {
@@ -212,6 +218,43 @@ func TestCentralReset_NoReAdd(t *testing.T) {
 	assertUpDown(t, readTraffic(t, db, email), 15, 15, "after central reset only increments accrue")
 	assertUpDown(t, readTraffic(t, db, email), 15, 15, "after central reset only increments accrue")
 }
 }
 
 
+// A real reset (ResetClientTrafficByEmail) must clear the per-node baseline so
+// the node's pre-reset cumulative — including traffic it counted but had not yet
+// synced — cannot leak back onto the master after the reset (#5476, #5390).
+func TestCentralResetClearsNodeBaseline_NoLeak(t *testing.T) {
+	db := initTrafficTestDB(t)
+	createNodeInbound(t, db, 1, "n1-in", 41001)
+	StartTrafficWriter()
+	svc := &InboundService{}
+
+	const email = "reset-revert"
+	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
+	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
+	assertUpDown(t, readTraffic(t, db, email), 200, 200, "before reset")
+
+	if err := svc.ResetClientTrafficByEmail(email); err != nil {
+		t.Fatalf("ResetClientTrafficByEmail: %v", err)
+	}
+	assertUpDown(t, readTraffic(t, db, email), 0, 0, "right after reset")
+
+	var baselines int64
+	if err := db.Model(&model.NodeClientTraffic{}).Where("email = ?", email).Count(&baselines).Error; err != nil {
+		t.Fatalf("count baselines: %v", err)
+	}
+	if baselines != 0 {
+		t.Fatalf("reset must clear node baseline rows, found %d", baselines)
+	}
+
+	// Node still reports its pre-reset cumulative (340 > last synced 300: usage it
+	// had not synced before the reset). It must not revert the reset.
+	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 340, Down: 340, Enable: true})
+	assertUpDown(t, readTraffic(t, db, email), 0, 0, "stale node counter must not revert reset")
+
+	// Genuine post-reset usage accrues from the rebaselined counter: 370-340=30.
+	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 370, Down: 370, Enable: true})
+	assertUpDown(t, readTraffic(t, db, email), 30, 30, "post-reset usage accrues")
+}
+
 func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
 func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
 	db := initTrafficTestDB(t)
 	db := initTrafficTestDB(t)
 	createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
 	createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")