瀏覽代碼

fix(node): stop Postgres deadlocks and deleted-client resurrection in node sync

Two defects in the node traffic sync, both hit hard on busy
master+multi-node Postgres deployments:

Client-IP merges deadlocked. Each node syncs on its own goroutine and
shared clients appear in several nodes' reports, but MergeInboundClientIps
and upsertNodeClientIps locked rows in whatever order each node's report
arrived. Two concurrent merges taking the same rows in opposite order is
exactly what Postgres aborts with SQLSTATE 40P01 ("merge client ips from
<node> failed: deadlock detected"). Both merges now process emails in
sorted order so every transaction acquires row locks in one global order.

Deleted clients resurrected with zeroed traffic. A snapshot fetched just
before a deletion still names the deleted email; applying it after the
delete committed re-added the client. The delete tombstone existed for
precisely this race but only zeroed the seed counters: the sync still
recreated the client_traffics row, and worse, adopted the node's stale
settings JSON wholesale, putting the client back in the central inbound
as if it were brand new with 0 traffic. Snapshot application now skips
row creation for tombstoned emails on known inbounds and strips
tombstoned clients from adopted settings; fresh node-adoption semantics
(rows seeded at zero) are unchanged.

The mass-disconnect part of the report is the forced node restart on
auto-disable, removed separately in 4d6f2ddd.

Closes #5739
MHSanaei 1 天之前
父節點
當前提交
9a3a12b260

+ 37 - 0
internal/web/service/client_locks.go

@@ -1,6 +1,7 @@
 package service
 
 import (
+	"encoding/json"
 	"sync"
 	"time"
 
@@ -139,3 +140,39 @@ func isClientEmailTombstoned(email string) bool {
 	}
 	return true
 }
+
+// stripTombstonedClients drops just-deleted client entries from a node
+// snapshot's settings JSON so adopting a stale snapshot can't re-add them to
+// the central inbound while the delete tombstone is live. Returns the filtered
+// JSON and whether anything was removed.
+func stripTombstonedClients(settings string) (string, bool) {
+	if settings == "" {
+		return settings, false
+	}
+	var parsed map[string]any
+	if err := json.Unmarshal([]byte(settings), &parsed); err != nil {
+		return settings, false
+	}
+	clients, _ := parsed["clients"].([]any)
+	if len(clients) == 0 {
+		return settings, false
+	}
+	kept := make([]any, 0, len(clients))
+	for _, c := range clients {
+		if cm, ok := c.(map[string]any); ok {
+			if email, _ := cm["email"].(string); email != "" && isClientEmailTombstoned(email) {
+				continue
+			}
+		}
+		kept = append(kept, c)
+	}
+	if len(kept) == len(clients) {
+		return settings, false
+	}
+	parsed["clients"] = kept
+	b, err := json.MarshalIndent(parsed, "", "  ")
+	if err != nil {
+		return settings, false
+	}
+	return string(b), true
+}

+ 8 - 0
internal/web/service/inbound_client_ips.go

@@ -79,6 +79,14 @@ func (s *InboundService) MergeInboundClientIps(incomingIps []model.InboundClient
 	now := time.Now().Unix()
 	cutoff := now - clientIpStaleAfterSeconds
 
+	// Node syncs run concurrently (one goroutine per node) and shared clients
+	// appear in several nodes' reports. Locking rows in each node's arbitrary
+	// report order lets two merges grab the same rows in opposite order, which
+	// Postgres aborts as a deadlock (40P01) — take them in one global order.
+	sort.Slice(incomingIps, func(i, j int) bool {
+		return incomingIps[i].ClientEmail < incomingIps[j].ClientEmail
+	})
+
 	tx := db.Begin()
 	defer func() {
 		if r := recover(); r != nil {

+ 20 - 3
internal/web/service/inbound_node.go

@@ -486,6 +486,14 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 
 		inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
 
+		// Adopting the node's settings verbatim would re-add a client the master
+		// deleted moments ago if this snapshot was fetched before the deletion
+		// push landed — filter just-deleted emails out while their tombstone lives.
+		adoptedSettings := snapIb.Settings
+		if stripped, changed := stripTombstonedClients(adoptedSettings); changed {
+			adoptedSettings = stripped
+		}
+
 		updates := map[string]any{}
 		if !dirty {
 			updates["enable"] = snapIb.Enable
@@ -496,7 +504,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			updates["protocol"] = snapIb.Protocol
 			updates["total"] = snapIb.Total
 			updates["expiry_time"] = snapIb.ExpiryTime
-			updates["settings"] = snapIb.Settings
+			updates["settings"] = adoptedSettings
 			updates["stream_settings"] = snapIb.StreamSettings
 			updates["sniffing"] = snapIb.Sniffing
 			updates["traffic_reset"] = snapIb.TrafficReset
@@ -513,7 +521,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			updates["origin_node_guid"] = og
 		}
 
-		if !dirty && (c.Settings != snapIb.Settings ||
+		if !dirty && (c.Settings != adoptedSettings ||
 			c.Remark != snapIb.Remark ||
 			c.Listen != snapIb.Listen ||
 			c.Port != snapIb.Port ||
@@ -634,8 +642,17 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 				if dirty {
 					continue
 				}
+				_, isNewInbound := newInboundIDs[c.Id]
+				// On a known inbound a missing row plus a live tombstone means the
+				// master just deleted this client and the snapshot predates the
+				// deletion push — recreating the row (at zero) would resurrect the
+				// client. A freshly adopted inbound still gets its row (seeded at
+				// zero) so adoption semantics stay intact.
+				if !isNewInbound && isClientEmailTombstoned(cs.Email) {
+					continue
+				}
 				var seedUp, seedDown int64
-				if _, isNewInbound := newInboundIDs[c.Id]; isNewInbound && !isClientEmailTombstoned(cs.Email) {
+				if isNewInbound && !isClientEmailTombstoned(cs.Email) {
 					seedUp, seedDown = canon.Up, canon.Down
 				}
 				row := &xray.ClientTraffic{

+ 12 - 4
internal/web/service/inbound_node_ips.go

@@ -64,6 +64,16 @@ func upsertNodeClientIps(guid string, perEmail map[string][]model.ClientIpEntry)
 		existingByEmail[existing[i].Email] = &existing[i]
 	}
 
+	// Deterministic row order keeps concurrent guid merges from deadlocking on
+	// Postgres (40P01) — same discipline as MergeInboundClientIps.
+	emails := make([]string, 0, len(perEmail))
+	for email := range perEmail {
+		if email != "" {
+			emails = append(emails, email)
+		}
+	}
+	sort.Strings(emails)
+
 	tx := db.Begin()
 	defer func() {
 		if r := recover(); r != nil {
@@ -71,10 +81,8 @@ func upsertNodeClientIps(guid string, perEmail map[string][]model.ClientIpEntry)
 		}
 	}()
 
-	for email, incoming := range perEmail {
-		if email == "" {
-			continue
-		}
+	for _, email := range emails {
+		incoming := perEmail[email]
 		var old []model.ClientIpEntry
 		if cur, ok := existingByEmail[email]; ok && cur.Ips != "" {
 			_ = json.Unmarshal([]byte(cur.Ips), &old)

+ 42 - 0
internal/web/service/node_client_traffic_sum_test.go

@@ -3,6 +3,7 @@ package service
 import (
 	"fmt"
 	"path/filepath"
+	"strings"
 	"testing"
 
 	"gorm.io/gorm"
@@ -135,6 +136,47 @@ func TestNodeAdd_ImportsClientHistoryWithNewInbound(t *testing.T) {
 	assertUpDown(t, readTraffic(t, db, email), histUp+1024, histDown+2048, "post-import delta accrues, no double count")
 }
 
+// TestStaleSnapshot_DeletedClientNotResurrected reproduces #5739: a snapshot
+// fetched just before a client's deletion still names the email. Applying it
+// must neither recreate the client_traffics row (at zero) nor re-add the
+// client to the central inbound's settings while the delete tombstone lives.
+func TestStaleSnapshot_DeletedClientNotResurrected(t *testing.T) {
+	db := initTrafficTestDB(t)
+	createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "victim-5739")
+	svc := &InboundService{}
+
+	const email = "victim-5739"
+	withClient := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
+	syncNodeWithSettings(t, svc, 1, "n1-in", withClient, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
+
+	if err := db.Model(&model.Inbound{}).Where("tag = ?", "n1-in").
+		Update("settings", `{"clients": []}`).Error; err != nil {
+		t.Fatalf("clear central settings: %v", err)
+	}
+	if err := db.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
+		t.Fatalf("delete stats row: %v", err)
+	}
+	tombstoneClientEmail(email)
+
+	syncNodeWithSettings(t, svc, 1, "n1-in", withClient, xray.ClientTraffic{Email: email, Up: 120, Down: 120, Enable: true})
+
+	var rows int64
+	if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Count(&rows).Error; err != nil {
+		t.Fatalf("count stats rows: %v", err)
+	}
+	if rows != 0 {
+		t.Errorf("deleted client's stats row resurrected by stale snapshot (%d rows)", rows)
+	}
+
+	var ib model.Inbound
+	if err := db.Where("tag = ?", "n1-in").First(&ib).Error; err != nil {
+		t.Fatalf("load inbound: %v", err)
+	}
+	if strings.Contains(ib.Settings, email) {
+		t.Errorf("deleted client re-added to central settings: %s", ib.Settings)
+	}
+}
+
 func TestNodeAdd_TombstonedClientNotResurrected(t *testing.T) {
 	db := initTrafficTestDB(t)
 	svc := &InboundService{}