Explorar el Código

fix(traffic): prevent phantom quota consumption from stale node data (#5412)

Three related bugs caused inflated traffic counters and spurious quota
hits on multi-node setups, most visibly when a client email was renamed
while a node was offline or its PostgreSQL deadlocked.

**Fix 1 — phantom quota (root cause)** `setRemoteTrafficLocked`
new-row path: when master had no `client_traffics` row for an email
that a node reported, it seeded the row with `Up: cs.Up` — importing
the node's full accumulated counter as if it were fresh quota usage.
If the node retained stale data from a previously-deleted account (e.g.
a failed deletion during an outage), the ghost 50 GB appeared on the
new client immediately and triggered `disableInvalidClients` the same
tick. Fixed by seeding at `Up: 0`; the current node value still becomes
the baseline so only future increments count.

**Fix 2 — PostgreSQL deadlock** `addClientTraffic` did a
read-modify-write via `tx.Save(slice)`, issuing UPDATEs in slice order.
Two concurrent goroutines locking the same rows in opposite order
deadlock on PostgreSQL (SQLite avoids this with file-level
serialisation). Replaced with atomic per-email
`UPDATE SET up=up+?, down=down+?` statements. Also preserves the
delayed-start ExpiryTime conversion that `adjustTraffics` computes
in-memory but the old Save path persisted to the DB.

**Fix 3 & 4 — stale `inbound_id` filters** `autoRenewClients` used
`WHERE inbound_id NOT IN (node inbounds)` to skip node clients, but
`client_traffics.inbound_id` is set once on INSERT and never refreshed.
Replaced with an email-based subquery through `client_inbounds` (the
authoritative source). Also added a safe type assertion for
`settings["clients"].([]any)` that previously panicked on nil.

**Fix 5 — stale `inbound_id` in reset** `resetAllClientTrafficsLocked`
used `WHERE inbound_id = ?` to find which emails to reset; same staleness
problem. Replaced with the `client_inbounds` join for email lookup;
the `inbounds.last_traffic_reset_time` update still correctly uses the
inbound ID directly on the `inbounds` table.

Tests updated to reflect the new seeding-at-zero semantics and a new
`TestGhostData_NoPhantomTraffic` test reproduces the exact 50 GB
phantom scenario.
Younes hace 1 día
padre
commit
fb03b0e9f1

+ 17 - 10
internal/web/service/client_traffic.go

@@ -121,22 +121,29 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error {
 	now := time.Now().Unix() * 1000
 
 	if err := db.Transaction(func(tx *gorm.DB) error {
-		whereText := "inbound_id "
+		// client_traffics.inbound_id is stale: it reflects the inbound the row was
+		// first inserted under and is never refreshed. Use the client_inbounds join
+		// as the authoritative source for which emails belong to a given inbound.
+		var resetEmails []string
 		if id == -1 {
-			whereText += " > ?"
+			if err := tx.Model(xray.ClientTraffic{}).Pluck("email", &resetEmails).Error; err != nil {
+				return err
+			}
 		} else {
-			whereText += " = ?"
+			if err := tx.Table("client_inbounds ci").
+				Select("c.email").
+				Joins("JOIN clients c ON c.id = ci.client_id").
+				Where("ci.inbound_id = ?", id).
+				Pluck("c.email", &resetEmails).Error; err != nil {
+				return err
+			}
 		}
-
-		var resetEmails []string
-		if err := tx.Model(xray.ClientTraffic{}).
-			Where(whereText, id).
-			Pluck("email", &resetEmails).Error; err != nil {
-			return err
+		if len(resetEmails) == 0 {
+			return nil
 		}
 
 		result := tx.Model(xray.ClientTraffic{}).
-			Where(whereText, id).
+			Where("email IN ?", resetEmails).
 			Updates(map[string]any{"enable": true, "up": 0, "down": 0})
 
 		if result.Error != nil {

+ 2 - 2
internal/web/service/inbound_node.go

@@ -559,8 +559,8 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 					Total:      cs.Total,
 					ExpiryTime: cs.ExpiryTime,
 					Reset:      cs.Reset,
-					Up:         canon.Up,
-					Down:       canon.Down,
+					Up:         0,
+					Down:       0,
 					LastOnline: cs.LastOnline,
 				}
 				if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).

+ 43 - 15
internal/web/service/inbound_traffic.go

@@ -133,9 +133,7 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
 		return err
 	}
 
-	// Index by email for O(N) merge — the previous nested loop was O(N²)
-	// and dominated each cron tick on inbounds with thousands of active
-	// clients (7500 × 7500 = 56M string comparisons every 10 seconds).
+	// Index by email for O(N) merge.
 	trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
 	for i := range traffics {
 		if traffics[i] != nil {
@@ -143,21 +141,39 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
 		}
 	}
 	now := time.Now().UnixMilli()
-	for dbTraffic_index := range dbClientTraffics {
-		t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
-		if !ok {
+	// Use atomic per-row UPDATE instead of read-modify-write Save. tx.Save
+	// issues UPDATEs in slice order, which varies between concurrent callers;
+	// on PostgreSQL two transactions locking the same rows in opposite order
+	// deadlock. An atomic "SET up = up + ?" never holds a row lock across a
+	// subsequent lock acquisition, so concurrent writers cannot deadlock.
+	for _, ct := range dbClientTraffics {
+		t, ok := trafficByEmail[ct.Email]
+		if !ok || (t.Up == 0 && t.Down == 0) {
 			continue
 		}
-		dbClientTraffics[dbTraffic_index].Up += t.Up
-		dbClientTraffics[dbTraffic_index].Down += t.Down
-		if t.Up+t.Down > 0 {
-			dbClientTraffics[dbTraffic_index].LastOnline = now
+		if err = tx.Exec(
+			fmt.Sprintf(
+				`UPDATE client_traffics SET up = up + ?, down = down + ?, last_online = %s WHERE email = ?`,
+				database.GreatestExpr("last_online", "?"),
+			),
+			t.Up, t.Down, now, ct.Email,
+		).Error; err != nil {
+			logger.Warning("AddClientTraffic update data ", err)
 		}
 	}
 
-	err = tx.Save(dbClientTraffics).Error
-	if err != nil {
-		logger.Warning("AddClientTraffic update data ", err)
+	// adjustTraffics converts delayed-start rows (negative ExpiryTime → absolute
+	// deadline) in-memory. Persist that conversion now since the traffic UPDATE
+	// above only touches up/down/last_online.
+	for _, ct := range dbClientTraffics {
+		if ct.ExpiryTime > 0 {
+			if err = tx.Exec(
+				`UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
+				ct.ExpiryTime, ct.Email,
+			).Error; err != nil {
+				logger.Warning("AddClientTraffic update expiry_time ", err)
+			}
+		}
 	}
 
 	return nil
@@ -272,9 +288,18 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
 	now := time.Now().Unix() * 1000
 	var err, err1 error
 
+	// Filter to clients that have at least one local inbound. Using
+	// client_traffics.inbound_id is wrong: it goes stale after an inbound is
+	// deleted/recreated and always points to the first inbound the client was
+	// attached to, so it could be a node inbound even when the client also has
+	// local inbounds. The email-based join through client_inbounds is authoritative.
 	err = tx.Model(xray.ClientTraffic{}).
 		Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
-		Where("inbound_id NOT IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NOT NULL")).
+		Where("email IN (?)", tx.Table("client_inbounds ci").
+			Select("c.email").
+			Joins("JOIN clients c ON c.id = ci.client_id").
+			Joins("JOIN inbounds i ON i.id = ci.inbound_id").
+			Where("i.node_id IS NULL")).
 		Find(&traffics).Error
 	if err != nil {
 		return false, 0, err
@@ -326,7 +351,10 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
 	for inbound_index := range inbounds {
 		settings := map[string]any{}
 		json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
-		clients := settings["clients"].([]any)
+		clients, _ := settings["clients"].([]any)
+		if len(clients) == 0 {
+			continue
+		}
 		for client_index := range clients {
 			c := clients[client_index].(map[string]any)
 			for traffic_index, traffic := range traffics {

+ 60 - 16
internal/web/service/node_client_traffic_sum_test.go

@@ -92,15 +92,17 @@ func TestTwoNodesShareEmail_SumsCorrectly(t *testing.T) {
 
 	const email = "shared"
 
+	// First-ever sync from each node: the row is created at 0 and the current
+	// node counters become the baseline. Only deltas beyond those baselines count.
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
 	syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
 
-	assertUpDown(t, readTraffic(t, db, email), 100, 100, "after baselines")
+	assertUpDown(t, readTraffic(t, db, email), 0, 0, "after baselines — historical node traffic not imported")
 
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
 	syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
 
-	assertUpDown(t, readTraffic(t, db, email), 210, 210, "after both nodes grow")
+	assertUpDown(t, readTraffic(t, db, email), 110, 110, "after both nodes grow — deltas (50+60) accrue")
 }
 
 func TestSingleNode_MirrorsCorrectly(t *testing.T) {
@@ -109,11 +111,13 @@ func TestSingleNode_MirrorsCorrectly(t *testing.T) {
 	svc := &InboundService{}
 
 	const email = "solo"
+	// First sync: row seeded at 0, current counter becomes the baseline.
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 500, Down: 600, Enable: true})
-	assertUpDown(t, readTraffic(t, db, email), 500, 600, "first sync")
+	assertUpDown(t, readTraffic(t, db, email), 0, 0, "first sync — historical traffic not imported")
 
+	// Second sync: delta (700-500=200 / 800-600=200) accrues normally.
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 700, Down: 800, Enable: true})
-	assertUpDown(t, readTraffic(t, db, email), 700, 800, "second sync mirrors cumulative")
+	assertUpDown(t, readTraffic(t, db, email), 200, 200, "second sync — delta accrues")
 }
 
 func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
@@ -137,22 +141,52 @@ func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
 	assertUpDown(t, readTraffic(t, db, email), 1100, 2100, "growth after upgrade accrues")
 }
 
+// TestGhostData_NoPhantomTraffic reproduces the 50 GB phantom traffic bug:
+// a node retains stale data for a deleted client (e.g. de-1 was offline during
+// deletion). When the master first syncs this email again it must NOT import the
+// stale counter — it sets Up=0 and records the current node value as the
+// baseline so only future deltas count.
+func TestGhostData_NoPhantomTraffic(t *testing.T) {
+	db := initTrafficTestDB(t)
+	createNodeInbound(t, db, 1, "de1-in", 41001)
+	svc := &InboundService{}
+
+	const email = "pouya"
+	const staleBytes int64 = 50 * 1024 * 1024 * 1024 // 50 GB stale on de-1
+
+	// Node reports a client with a large pre-existing counter (ghost data from a
+	// deleted account that survived on the node). Master has no row for this email.
+	syncNode(t, svc, 1, "de1-in", xray.ClientTraffic{Email: email, Up: staleBytes, Down: staleBytes, Enable: true})
+
+	ct := readTraffic(t, db, email)
+	if ct.Up >= staleBytes || ct.Down >= staleBytes {
+		t.Errorf("ghost data imported: up=%d down=%d; stale node counter must not count as new traffic", ct.Up, ct.Down)
+	}
+	assertUpDown(t, ct, 0, 0, "first sync of ghost email — imported as 0, not 50 GB")
+
+	// Subsequent syncs only add incremental usage beyond the baseline.
+	syncNode(t, svc, 1, "de1-in", xray.ClientTraffic{Email: email, Up: staleBytes + 1024, Down: staleBytes + 2048, Enable: true})
+	assertUpDown(t, readTraffic(t, db, email), 1024, 2048, "only incremental traffic beyond baseline counts")
+}
+
 func TestNodeCounterReset_Clamped(t *testing.T) {
 	db := initTrafficTestDB(t)
 	createNodeInbound(t, db, 1, "n1-in", 41001)
 	svc := &InboundService{}
 
 	const email = "restart"
+	// First sync seeds at 0 (baseline=900). Second sync adds delta 950-900=50.
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 900, Down: 900, Enable: true})
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
-	assertUpDown(t, readTraffic(t, db, email), 950, 950, "before node reset")
+	assertUpDown(t, readTraffic(t, db, email), 50, 50, "before node reset")
 
+	// Counter resets to 50 (Xray restart). delta=50-950=-900 → clamped → adds 50.
 	syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
 	ct := readTraffic(t, db, email)
 	if ct.Up < 0 || ct.Down < 0 {
 		t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
 	}
-	assertUpDown(t, ct, 1000, 1000, "after node counter reset (clamped)")
+	assertUpDown(t, ct, 100, 100, "after node counter reset (clamped)")
 }
 
 func TestCentralReset_NoReAdd(t *testing.T) {
@@ -186,11 +220,13 @@ func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
 
 	const email = "shared"
 	settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
+	// Baselines set: node1=100, node2=200. Row seeded at 0.
 	syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
 	syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
+	// node1 delta=50, node2 delta=60 → total=110.
 	syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
 	syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
-	assertUpDown(t, readTraffic(t, db, email), 210, 210, "baseline sum")
+	assertUpDown(t, readTraffic(t, db, email), 110, 110, "baseline sum")
 
 	// Node 1 rebuilt (reinstall / another master's reconcile): its inbound
 	// vanishes from the snapshot. The shared accumulator must survive — losing
@@ -199,11 +235,11 @@ func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
 	if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil {
 		t.Fatalf("sync node 1 with empty snapshot: %v", err)
 	}
-	assertUpDown(t, readTraffic(t, db, email), 210, 210, "after node 1 inbound removal")
+	assertUpDown(t, readTraffic(t, db, email), 110, 110, "after node 1 inbound removal")
 
-	// Node 2 keeps accruing onto the surviving row.
+	// Node 2 keeps accruing onto the surviving row. node2 delta=300-260=40 → 150.
 	syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
-	assertUpDown(t, readTraffic(t, db, email), 250, 250, "after node 2 grows")
+	assertUpDown(t, readTraffic(t, db, email), 150, 150, "after node 2 grows")
 }
 
 func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
@@ -214,16 +250,18 @@ func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
 
 	const email = "shared"
 	settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
+	// First syncs set baselines (node1=100, node2=200). Row seeded at 0.
 	syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
 	syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
 
 	// Client detached from node 1's inbound only: its stats vanish from that
 	// inbound's snapshot while node 2 still hosts the email.
 	syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`)
-	assertUpDown(t, readTraffic(t, db, email), 100, 100, "after client left node 1")
+	assertUpDown(t, readTraffic(t, db, email), 0, 0, "after client left node 1 — row unchanged at 0")
 
+	// Node 2 delta: 240-200=40 → accrues to 40.
 	syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true})
-	assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing")
+	assertUpDown(t, readTraffic(t, db, email), 40, 40, "node 2 keeps accruing")
 }
 
 // TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
@@ -306,15 +344,21 @@ func TestMultiAttach_SameNode_DivergentSiblings(t *testing.T) {
 		}
 	}
 
+	// First-ever sync: row seeded at 0, canon (max of siblings = 100) becomes the
+	// baseline. The node total is NOT imported as historical traffic — this prevents
+	// ghost data from a previously-deleted account being counted as real usage.
 	sync(100, 50, 80)
-	assertUpDown(t, readTraffic(t, db, email), 100, 100, "first sync counts the node total once, not the sum")
+	assertUpDown(t, readTraffic(t, db, email), 0, 0, "first sync seeds at 0 — not the sum (230) nor the max (100)")
 
+	// Second sync: canon grew from 100→150, delta=50 accrues. Siblings 60 and 90
+	// do not re-add their full values — the canon clamp prevents inflation.
 	sync(150, 60, 90)
-	assertUpDown(t, readTraffic(t, db, email), 150, 150, "second sync: grew by 50, not by every sibling")
+	assertUpDown(t, readTraffic(t, db, email), 50, 50, "second sync: only the 50-unit increment counts, not every sibling")
 
-	// Equal siblings (the healthy current-schema case) must still accrue once.
+	// Equal siblings (the healthy current-schema case): canon grew from 150→200,
+	// delta=50 accrues once.
 	sync(200, 200, 200)
-	assertUpDown(t, readTraffic(t, db, email), 200, 200, "equal siblings accrue the single increment")
+	assertUpDown(t, readTraffic(t, db, email), 100, 100, "equal siblings accrue the single increment")
 }
 
 func TestDelClientStat_CleansNodeBaselines(t *testing.T) {