1
0
Эх сурвалжийг харах

fix(db): additional cross-DB and node traffic edge cases (migration scan + node reset time) (#5045)

* fix(db): additional cross-DB and node traffic edge cases

- ExternalProxy migration: change StreamSettings scan from []byte
  to string (text column on both SQLite and Postgres). Use []byte()
  for json.Unmarshal. Avoids potential scan/encoding differences
  in migration of old multi-domain data on PG.

- Node mirror inbound creation and updates in SetRemoteTraffic:
  now copy LastTrafficResetTime from the node's reported snapshot.
  Previously, resets done on the node (or via API that affects
  node-owned inbounds) would not update the grace period tracking
  on the central mirror. This improves traffic reset + node traffic
  combining accuracy when using the public API to manage node
  inbounds or when nodes perform resets.

These are independent additional issues around node traffic
combining, creating mirrored node inbounds from snapshots,
and migration code that can affect Postgres (or mixed) setups
after API changes or node operations. They do not depend on the
previous enable-merge or tag/sub fixes.

Base: upstream/main (separate PR).

* fix(db): even more node/SQLite edge cases (chunked IN for node stats, tag cleanup)

- In NodeService.GetAll (used for node list/stats): the load of client_traffics
  for node inbound IDs used a direct "IN ?" with all IDs. On SQLite this
  can hit the bind var limit ("too many SQL variables") when there are
  many nodes/inbounds. Chunked using the existing chunkInts + sqliteMaxVars
  (same pattern as other large IN queries in the package). This is a
  specific scale issue for "node" setups on SQLite (PG is fine with large IN).

- Tag cleanup raw in MigrationRequirements (always runs at startup):
  was using SQLite-only INSTR. Fixed to use position() on PG (same as
  the previous tag fix on the main branch). Prevents startup crash on PG
  after node/inbound API changes that leave old tags.

These are additional specific cases around node traffic/stats combining,
node inbound counts, and startup migrations that can affect Postgres
users or large SQLite node deployments. They are independent of the
enable/traffic core fixes and the prior additional ones.

Added to the clean additional-issues branch for the separate PR.

* fix(db): even more for node traffic merge on 5045 (dialect enable expr + chunk gone deletes)

- Full dialect-safe client enable merge in setRemoteTrafficLocked:
  - Added ClientTrafficEnableMergeExpr() helper (PG CASE with ::boolean
    casts to avoid type errors; SQLite numeric for affinity).
  - Updated GreatestExpr with ::bigint casts on PG.
  - Switched the merge UPDATE from "enable AND ?" to the helper.
  This completes the node traffic sync safety for the "only node can
  disable" logic across DBs (core of the original symptom after API
  inbound updates on nodes).

- Chunked the NodeClientTraffic delete for "goneEmails" (when a node's
  snapshot no longer includes clients previously attached to a mirrored
  inbound). The "email IN ?" could exceed SQLite bind limit for nodes
  with many clients (after API deletes, bulk ops, or structural changes).
  Uses chunkStrings + sqliteMaxVars (consistent with the node stats chunk
  we added earlier).

These are direct extensions of node traffic combining, mirrored inbound
lifecycle, and API-driven changes that affect client_traffics / NodeClientTraffic
for nodes. Stayed on the clean 5045 branch as requested.

Pushed to update https://github.com/MHSanaei/3x-ui/pull/5045

---------

Co-authored-by: Rqzbeh <[email protected]>
Rouzbeh† 12 цаг өмнө
parent
commit
94b8196e84

+ 0 - 20
database/dialect.go

@@ -2,9 +2,6 @@ package database
 
 import "fmt"
 
-// JSONClientsFromInbound returns the FROM clause that yields one row per element
-// of inbounds.settings -> clients, with a column named `client.value` whose text
-// fields can be read with JSONFieldText("client.value", "<key>").
 func JSONClientsFromInbound() string {
 	if IsPostgres() {
 		return "FROM inbounds, jsonb_array_elements(inbounds.settings::jsonb -> 'clients') AS client(value)"
@@ -27,26 +24,9 @@ func GreatestExpr(a, b string) string {
 	return fmt.Sprintf("MAX(%s, %s)", a, b)
 }
 
-// ClientTrafficEnableMergeExpr returns the SQL expression used in the
-// node traffic merge to update client_traffics.enable.
-//
-// The intent is: only allow the remote node to *disable* a client
-// (never re-enable one that the central panel has disabled).
-//
-// We use a dialect-specific expression because:
-// - On PostgreSQL we want strict boolean typing and casts to avoid
-//   "CASE types boolean and integer cannot be matched" errors
-//   (and similar internal expansions of AND/GREATEST).
-// - On SQLite, enable is stored with INTEGER affinity (0/1), there is
-//   no :: cast syntax, and we must produce a numeric-compatible result.
-//
-// The expression must be valid SQL for tx.Exec with a boolean parameter
-// as the first ?.
 func ClientTrafficEnableMergeExpr() string {
 	if IsPostgres() {
 		return "CASE WHEN ?::boolean THEN enable::boolean ELSE false END"
 	}
-	// SQLite: no :: casts. Use numeric CASE. 1/0 work as true/false
-	// thanks to SQLite's affinity and how GORM/drivers bind bools.
 	return "CASE WHEN ? THEN enable ELSE 0 END"
 }

+ 39 - 37
web/service/inbound.go

@@ -1665,23 +1665,24 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 				continue
 			}
 			newIb := model.Inbound{
-				UserId:         defaultUserId,
-				NodeID:         &nodeID,
-				OriginNodeGuid: originGuidFor(snapIb),
-				Tag:            chosenTag,
-				Listen:         snapIb.Listen,
-				Port:           snapIb.Port,
-				Protocol:       snapIb.Protocol,
-				Settings:       snapIb.Settings,
-				StreamSettings: snapIb.StreamSettings,
-				Sniffing:       snapIb.Sniffing,
-				TrafficReset:   snapIb.TrafficReset,
-				Enable:         snapIb.Enable,
-				Remark:         snapIb.Remark,
-				Total:          snapIb.Total,
-				ExpiryTime:     snapIb.ExpiryTime,
-				Up:             snapIb.Up,
-				Down:           snapIb.Down,
+				UserId:               defaultUserId,
+				NodeID:               &nodeID,
+				OriginNodeGuid:       originGuidFor(snapIb),
+				Tag:                  chosenTag,
+				Listen:               snapIb.Listen,
+				Port:                 snapIb.Port,
+				Protocol:             snapIb.Protocol,
+				Settings:             snapIb.Settings,
+				StreamSettings:       snapIb.StreamSettings,
+				Sniffing:             snapIb.Sniffing,
+				TrafficReset:         snapIb.TrafficReset,
+				LastTrafficResetTime: snapIb.LastTrafficResetTime,
+				Enable:               snapIb.Enable,
+				Remark:               snapIb.Remark,
+				Total:                snapIb.Total,
+				ExpiryTime:           snapIb.ExpiryTime,
+				Up:                   snapIb.Up,
+				Down:                 snapIb.Down,
 			}
 			if err := tx.Create(&newIb).Error; err != nil {
 				logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
@@ -1710,6 +1711,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			updates["stream_settings"] = snapIb.StreamSettings
 			updates["sniffing"] = snapIb.Sniffing
 			updates["traffic_reset"] = snapIb.TrafficReset
+			updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
 		}
 		if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
 			updates["up"] = snapIb.Up
@@ -1755,9 +1757,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 			return false, err
 		}
 		if len(goneEmails) > 0 {
-			if err := tx.Where("node_id = ? AND email IN ?", nodeID, goneEmails).
-				Delete(&model.NodeClientTraffic{}).Error; err != nil {
-				return false, err
+			// Chunk to avoid SQLite bind var limit when a node has many clients
+			// removed (e.g. after API bulk delete or structural change on node inbound).
+			for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) {
+				if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
+					Delete(&model.NodeClientTraffic{}).Error; err != nil {
+					return false, err
+				}
 			}
 		}
 		if err := tx.Where("inbound_id = ?", c.Id).
@@ -1836,18 +1842,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 				structuralChange = true
 			}
 
-			// Only allow the node to disable a client (cs.Enable=false), never
-			// to re-enable one the panel has already disabled. A stale snapshot
-			// from the node arriving after a central disable would otherwise
-			// overwrite enable=false back to true, letting the client accumulate
-			// far more traffic than their limit before being disabled again.
-			//
-			// We use a dialect-aware expression (see database.ClientTrafficEnableMergeExpr)
-			// because the old "enable AND ?" form (and naive CASE with :: casts)
-			// caused type mismatches on PostgreSQL after public API inbound updates
-			// (which go through updateClientTraffics + SyncInbound and can touch
-			// client_traffics rows) and would also break on SQLite due to PG-only
-			// ::boolean syntax.
 			enableExpr := database.ClientTrafficEnableMergeExpr()
 			if err := tx.Exec(
 				fmt.Sprintf(
@@ -3697,7 +3691,7 @@ func (s *InboundService) MigrationRequirements() {
 	var externalProxy []struct {
 		Id             int
 		Port           int
-		StreamSettings []byte
+		StreamSettings string // text column on both DBs; safer than []byte for cross-DB scan
 	}
 	externalProxyQuery := `select id, port, stream_settings
 	from inbounds
@@ -3719,7 +3713,7 @@ func (s *InboundService) MigrationRequirements() {
 	for _, ep := range externalProxy {
 		var reverses any
 		var stream map[string]any
-		json.Unmarshal(ep.StreamSettings, &stream)
+		json.Unmarshal([]byte(ep.StreamSettings), &stream)
 		if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
 			if settings, ok := tlsSettings["settings"].(map[string]any); ok {
 				if domains, ok := settings["domains"].([]any); ok {
@@ -3741,9 +3735,17 @@ func (s *InboundService) MigrationRequirements() {
 		tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
 	}
 
-	err = tx.Raw(`UPDATE inbounds
-	SET tag = REPLACE(tag, '0.0.0.0:', '')
-	WHERE INSTR(tag, '0.0.0.0:') > 0;`).Error
+	// Legacy tag cleanup for old auto-generated tags (e.g. "0.0.0.0:443-...").
+	// Must be cross-DB: INSTR/REPLACE work on SQLite; Postgres needs position().
+	tagCleanup := `UPDATE inbounds
+		SET tag = REPLACE(tag, '0.0.0.0:', '')
+		WHERE INSTR(tag, '0.0.0.0:') > 0;`
+	if database.IsPostgres() {
+		tagCleanup = `UPDATE inbounds
+			SET tag = REPLACE(tag, '0.0.0.0:', '')
+			WHERE position('0.0.0.0:' in tag) > 0;`
+	}
+	err = tx.Raw(tagCleanup).Error
 	if err != nil {
 		return
 	}

+ 23 - 14
web/service/node.go

@@ -226,11 +226,20 @@ func (s *NodeService) GetAll() ([]*model.Node, error) {
 	for id := range nodeByInbound {
 		inboundIDs = append(inboundIDs, id)
 	}
-	if err := db.Table("client_traffics").
-		Select("inbound_id, email, enable, total, up, down, expiry_time").
-		Where("inbound_id IN ?", inboundIDs).
-		Scan(&trafficRows).Error; err == nil {
-		depletedByNode := make(map[int]int)
+	// Chunk the IN clause to avoid "too many SQL variables" on SQLite
+	// when there are many node-owned inbounds (common with many nodes).
+	// sqliteMaxVars is defined in this package (inbound.go).
+	for _, batch := range chunkInts(inboundIDs, sqliteMaxVars) {
+		var page []trafficRow
+		if err := db.Table("client_traffics").
+			Select("inbound_id, email, enable, total, up, down, expiry_time").
+			Where("inbound_id IN ?", batch).
+			Scan(&page).Error; err == nil {
+			trafficRows = append(trafficRows, page...)
+		}
+	}
+	depletedByNode := make(map[int]int)
+	if len(trafficRows) > 0 {
 		for _, row := range trafficRows {
 			nodeID, ok := nodeByInbound[row.InboundID]
 			if !ok {
@@ -242,15 +251,15 @@ func (s *NodeService) GetAll() ([]*model.Node, error) {
 				depletedByNode[nodeID]++
 			}
 		}
-		onlineByGuid := s.onlineEmailsByGuid()
-		for _, n := range nodes {
-			n.InboundCount = len(inboundsByNode[n.Id])
-			n.DepletedCount = depletedByNode[n.Id]
-			// Online is attributed to the node that physically hosts the client
-			// (by GUID): a client on a sub-node counts under the sub-node, not
-			// the intermediate node it syncs through (#4983).
-			n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)])
-		}
+	}
+	onlineByGuid := s.onlineEmailsByGuid()
+	for _, n := range nodes {
+		n.InboundCount = len(inboundsByNode[n.Id])
+		n.DepletedCount = depletedByNode[n.Id]
+		// Online is attributed to the node that physically hosts the client
+		// (by GUID): a client on a sub-node counts under the sub-node, not
+		// the intermediate node it syncs through (#4983).
+		n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)])
 	}
 
 	return nodes, nil