Ver Fonte

fix(node): stop one rejected inbound from starving a node's traffic sync

A legacy socks inbound (predating the socks-to-mixed protocol rename) fails the node's request validation when pushed. ReconcileNode aborted on the first failed inbound and syncOne then skipped the traffic snapshot entirely and never cleared ConfigDirty, so the whole node re-failed every tick and the master stopped deducting traffic for every client on that node, exactly as reported in #5685.

Three-part fix: ReconcileNode now pushes every inbound and runs the delete sweep even past individual failures, returning the failures joined; syncOne logs a failed reconcile but continues with the traffic pull (dirty stays set, so reconcile retries and the merge stays in its conservative mode); and a migration renames legacy socks inbounds to mixed, which has an identical settings shape, removing the known trigger.

Closes #5685
MHSanaei há 6 horas atrás
pai
commit
d105b2741c

+ 20 - 0
internal/database/db.go

@@ -113,6 +113,9 @@ func initModels() error {
 	if err := normalizeInboundSubSortIndex(); err != nil {
 		return err
 	}
+	if err := migrateLegacySocksInboundsToMixed(); err != nil {
+		return err
+	}
 	if IsPostgres() {
 		if err := resyncPostgresSequences(db, models); err != nil {
 			log.Printf("Error resyncing postgres sequences: %v", err)
@@ -483,6 +486,23 @@ func pruneOrphanedClientInbounds() error {
 	return nil
 }
 
+// migrateLegacySocksInboundsToMixed renames legacy socks inbounds to mixed.
+// The protocol enum dropped socks in favor of mixed (identical settings shape,
+// same behavior plus HTTP on the shared port), so rows predating the rename
+// fail model validation — most visibly when pushed to a node, where one legacy
+// inbound stalled the entire node's config and traffic sync (#5685).
+func migrateLegacySocksInboundsToMixed() error {
+	res := db.Exec("UPDATE inbounds SET protocol = 'mixed' WHERE protocol = 'socks'")
+	if res.Error != nil {
+		log.Printf("Error migrating legacy socks inbounds to mixed: %v", res.Error)
+		return res.Error
+	}
+	if res.RowsAffected > 0 {
+		log.Printf("Migrated %d legacy socks inbound(s) to mixed", res.RowsAffected)
+	}
+	return nil
+}
+
 // normalizeInboundSubSortIndex lifts sub_sort_index values below the 1-based
 // minimum (rows written by builds that defaulted the column to 0, or by nodes
 // predating the field) so they cannot sort ahead of explicitly ranked inbounds.

+ 9 - 6
internal/web/job/node_traffic_sync_job.go

@@ -368,13 +368,16 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
 		reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n)
 		reconcileCancel()
 		if reconcileErr != nil {
-			logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr)
-			return nil
-		}
-		if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
-			logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr)
+			// The dirty flag stays set so reconcile retries next tick, but traffic
+			// accounting must keep flowing: one rejected inbound used to starve the
+			// whole node's traffic/online sync forever (#5685).
+			logger.Warningf("node traffic sync: reconcile for %s failed, continuing with traffic pull: %v", n.Name, reconcileErr)
+		} else {
+			if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
+				logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr)
+			}
+			j.structural.set()
 		}
-		j.structural.set()
 	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)

+ 8 - 3
internal/web/service/inbound_node.go

@@ -82,6 +82,10 @@ func (s *InboundService) AnyNodePending(inboundIds []int) bool {
 	return false
 }
 
+// ReconcileNode pushes every inbound and sweeps undesired remote tags even when
+// individual operations fail, returning the failures joined: one inbound the
+// node rejects (e.g. a legacy protocol failing validation, #5685) must not
+// stall the rest of the node's config — or, via syncOne, its traffic sync.
 func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, n *model.Node) error {
 	if rt == nil || n == nil || n.Id <= 0 {
 		return nil
@@ -102,6 +106,7 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote,
 	}
 	prefix := nodeTagPrefix(&nodeID)
 	desiredTags := make(map[string]struct{}, len(inbounds)*2)
+	var errs []error
 	for _, ib := range inbounds {
 		desiredTags[ib.Tag] = struct{}{}
 		// existsOnNode: does the node already report this inbound under any of the
@@ -121,7 +126,7 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote,
 			}
 		}
 		if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil {
-			return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
+			errs = append(errs, fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err))
 		}
 	}
 	// In "selected" sync mode the panel only manages the selected tags: the
@@ -145,10 +150,10 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote,
 			}
 		}
 		if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
-			return fmt.Errorf("reconcile delete %q: %w", tag, err)
+			errs = append(errs, fmt.Errorf("reconcile delete %q: %w", tag, err))
 		}
 	}
-	return nil
+	return errors.Join(errs...)
 }
 
 const resetGracePeriodMs int64 = 30000

+ 82 - 0
internal/web/service/inbound_node_reconcile_test.go

@@ -143,6 +143,88 @@ func TestReconcileNode_AllModeDeletesUndesiredRemoteInbounds(t *testing.T) {
 	}
 }
 
+// One inbound the node rejects (e.g. a legacy protocol failing the node's
+// request validation, #5685) must not abort the reconcile: the healthy inbound
+// is still pushed, the delete sweep still runs, and the returned error names
+// the failed tag so the caller keeps the dirty flag set for retry.
+func TestReconcileNode_ContinuesPastFailedInbound(t *testing.T) {
+	setupConflictDB(t)
+
+	var mu sync.Mutex
+	updated := map[int]int{}
+	var deleted []int
+	tagToID := map[string]int{"legacy": 1, "healthy": 2, "gone": 3}
+	writeOK := func(w http.ResponseWriter, obj any) {
+		w.Header().Set("Content-Type", "application/json")
+		_ = json.NewEncoder(w).Encode(map[string]any{"success": true, "msg": "", "obj": obj})
+	}
+	mux := http.NewServeMux()
+	mux.HandleFunc("/panel/api/inbounds/list", func(w http.ResponseWriter, _ *http.Request) {
+		type row struct {
+			Id  int    `json:"id"`
+			Tag string `json:"tag"`
+		}
+		rows := make([]row, 0, len(tagToID))
+		for tag, id := range tagToID {
+			rows = append(rows, row{Id: id, Tag: tag})
+		}
+		writeOK(w, rows)
+	})
+	mux.HandleFunc("/panel/api/inbounds/update/", func(w http.ResponseWriter, r *http.Request) {
+		id, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/panel/api/inbounds/update/"))
+		if err != nil {
+			http.Error(w, "bad id", http.StatusBadRequest)
+			return
+		}
+		if id == tagToID["legacy"] {
+			http.Error(w, "request body failed validation", http.StatusBadRequest)
+			return
+		}
+		mu.Lock()
+		updated[id]++
+		mu.Unlock()
+		writeOK(w, nil)
+	})
+	mux.HandleFunc("/panel/api/inbounds/del/", func(w http.ResponseWriter, r *http.Request) {
+		id, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/panel/api/inbounds/del/"))
+		if err != nil {
+			http.Error(w, "bad id", http.StatusBadRequest)
+			return
+		}
+		mu.Lock()
+		deleted = append(deleted, id)
+		mu.Unlock()
+		writeOK(w, nil)
+	})
+	ts := httptest.NewServer(mux)
+	t.Cleanup(ts.Close)
+
+	node := reconcileTestNode(t, ts, "half-broken-node", "all", nil)
+	seedInboundConflictNode(t, "legacy", "", 1080, model.Protocol("socks"), ``, `{"auth":"noauth"}`, &node.Id)
+	seedInboundConflictNode(t, "healthy", "", 443, model.VLESS, `{"network":"tcp"}`, `{"clients":[]}`, &node.Id)
+
+	svc := InboundService{}
+	err := svc.ReconcileNode(context.Background(), runtime.NewRemote(node, nil), node)
+	if err == nil {
+		t.Fatal("ReconcileNode: want an error naming the rejected inbound, got nil")
+	}
+	if !strings.Contains(err.Error(), `reconcile inbound "legacy"`) {
+		t.Fatalf("ReconcileNode error = %q, want it to name inbound \"legacy\"", err)
+	}
+
+	mu.Lock()
+	healthyPushes := updated[tagToID["healthy"]]
+	gotDeleted := append([]int(nil), deleted...)
+	mu.Unlock()
+	if healthyPushes != 1 {
+		t.Fatalf("healthy inbound pushed %d times, want 1", healthyPushes)
+	}
+	sort.Ints(gotDeleted)
+	if len(gotDeleted) != 1 || gotDeleted[0] != tagToID["gone"] {
+		t.Fatalf("deleted remote ids = %v, want [%d] (sweep must still run past the failure)", gotDeleted, tagToID["gone"])
+	}
+}
+
 func TestEnsureInboundTagAllowed(t *testing.T) {
 	setupConflictDB(t)
 	db := database.GetDB()