Преглед изворни кода

feat(clients): bulk-set XTLS flow from the Adjust dialog (#5524)

* feat(clients): bulk-set XTLS flow from the Adjust dialog

Add a "Set flow" dropdown to the bulk Adjust dialog so an admin can set or
clear the XTLS flow on all selected clients at once, alongside the existing
days/traffic bumps. Empty by default (no effect on save); "Disable" clears
flow, and the two vision values mirror the per-client credential tab.

Flow rides the existing inbound-JSON -> SyncInbound path (ClientRecord.Flow +
client_inbounds.flow_override), so no new endpoint, DB column, or migration.
Setting a vision flow is gated by inboundCanEnableTlsFlow: ineligible inbounds
are left untouched and reported as skipped; clearing is always allowed. A real
flow change requests an xray restart (local) or a node reconcile (remote).

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>

* fix(clients): keep days/traffic write when bulk flow is ineligible

Address review on the bulk-flow-adjust PR:

- Blocking: a client adjusted with both a days/traffic delta and a flow
  directive on a flow-ineligible inbound had the flow-ineligibility recorded
  into the same skip set that gates the ClientTraffic write, so the inbound
  JSON / ClientRecord advanced but ClientTraffic did not — divergent stores,
  and the client misreported as skipped. Track flow ineligibility in its own
  map (bulkInboundAdjustResult.flowIneligible) so it only feeds the final
  Skipped report and never suppresses the expiry/total persistence.
- Drop the broad delete(skippedReasons, email): flow reasons no longer enter
  skippedReasons, so honoring a flow can no longer erase an unrelated skip
  reason (unlimited expiry, a real persistence error on another inbound).
- Drop the inline comment block from ClientBulkAdjustModal.tsx (file had none);
  move the whitelist-sync note next to bulkFlowAllowed, the source of truth.
- Document the optional flow field in the bulkAdjust API-docs example
  (endpoints.ts) and regenerate openapi.json.
- Add a regression test covering days+flow on an ineligible inbound.

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
Rouzbeh† пре 14 часа
родитељ
комит
14de0557f9

+ 3 - 2
frontend/public/openapi.json

@@ -5476,7 +5476,7 @@
         "tags": [
           "Clients"
         ],
-        "summary": "Shift expiry and/or traffic quota for many clients in one call. addDays/addBytes may be negative. Clients with unlimited expiry (expiryTime=0) or unlimited traffic (totalGB=0) are skipped for the corresponding field — bulk extend never converts unlimited to limited. Returns the adjusted count and per-email skip reasons.",
+        "summary": "Shift expiry and/or traffic quota for many clients in one call. addDays/addBytes may be negative. Clients with unlimited expiry (expiryTime=0) or unlimited traffic (totalGB=0) are skipped for the corresponding field — bulk extend never converts unlimited to limited. The optional flow directive sets the XTLS flow on every client: \"none\" clears it, \"xtls-rprx-vision\"/\"xtls-rprx-vision-udp443\" set it where the inbound supports it (omit or \"\" to leave it unchanged). Returns the adjusted count and per-email skip reasons.",
         "operationId": "post_panel_api_clients_bulkAdjust",
         "requestBody": {
           "required": true,
@@ -5491,7 +5491,8 @@
                   "bob"
                 ],
                 "addDays": 30,
-                "addBytes": 53687091200
+                "addBytes": 53687091200,
+                "flow": "xtls-rprx-vision"
               }
             }
           }

+ 3 - 3
frontend/src/hooks/useClients.ts

@@ -341,7 +341,7 @@ export function useClients() {
   });
 
   const bulkAdjustMut = useMutation({
-    mutationFn: async (payload: { emails: string[]; addDays: number; addBytes: number }): Promise<Msg<BulkAdjustResult>> => {
+    mutationFn: async (payload: { emails: string[]; addDays: number; addBytes: number; flow: string }): Promise<Msg<BulkAdjustResult>> => {
       const raw = await HttpUtil.post('/panel/api/clients/bulkAdjust', payload, JSON_HEADERS);
       return parseMsg(raw, BulkAdjustResultSchema, 'clients/bulkAdjust');
     },
@@ -435,9 +435,9 @@ export function useClients() {
     if (!Array.isArray(payloads) || payloads.length === 0) return Promise.resolve(null as unknown as Msg<BulkCreateResult>);
     return bulkCreateMut.mutateAsync(payloads);
   }, [bulkCreateMut]);
-  const bulkAdjust = useCallback((emails: string[], addDays: number, addBytes: number) => {
+  const bulkAdjust = useCallback((emails: string[], addDays: number, addBytes: number, flow = '') => {
     if (!Array.isArray(emails) || emails.length === 0) return Promise.resolve(null);
-    return bulkAdjustMut.mutateAsync({ emails, addDays, addBytes });
+    return bulkAdjustMut.mutateAsync({ emails, addDays, addBytes, flow });
   }, [bulkAdjustMut]);
   const bulkAddToGroup = useCallback((emails: string[], group: string) => {
     if (!Array.isArray(emails) || emails.length === 0) return Promise.resolve(null);

+ 2 - 2
frontend/src/pages/api-docs/endpoints.ts

@@ -635,8 +635,8 @@ export const sections: readonly Section[] = [
       {
         method: 'POST',
         path: '/panel/api/clients/bulkAdjust',
-        summary: 'Shift expiry and/or traffic quota for many clients in one call. addDays/addBytes may be negative. Clients with unlimited expiry (expiryTime=0) or unlimited traffic (totalGB=0) are skipped for the corresponding field — bulk extend never converts unlimited to limited. Returns the adjusted count and per-email skip reasons.',
-        body: '{\n  "emails": ["alice", "bob"],\n  "addDays": 30,\n  "addBytes": 53687091200\n}',
+        summary: 'Shift expiry and/or traffic quota for many clients in one call. addDays/addBytes may be negative. Clients with unlimited expiry (expiryTime=0) or unlimited traffic (totalGB=0) are skipped for the corresponding field — bulk extend never converts unlimited to limited. The optional flow directive sets the XTLS flow on every client: "none" clears it, "xtls-rprx-vision"/"xtls-rprx-vision-udp443" set it where the inbound supports it (omit or "" to leave it unchanged). Returns the adjusted count and per-email skip reasons.',
+        body: '{\n  "emails": ["alice", "bob"],\n  "addDays": 30,\n  "addBytes": 53687091200,\n  "flow": "xtls-rprx-vision"\n}',
         response: '{\n  "success": true,\n  "obj": {\n    "adjusted": 2,\n    "skipped": [\n      { "email": "carol", "reason": "unlimited expiry" }\n    ]\n  }\n}',
       },
       {

+ 22 - 4
frontend/src/pages/clients/ClientBulkAdjustModal.tsx

@@ -1,16 +1,19 @@
 import { useEffect, useState } from 'react';
 import { useTranslation } from 'react-i18next';
-import { Alert, Form, InputNumber, Modal, message } from 'antd';
+import { Alert, Form, InputNumber, Modal, Select, message } from 'antd';
 
 import { ClientBulkAdjustFormSchema } from '@/schemas/client';
+import { TLS_FLOW_CONTROL } from '@/schemas/primitives/flow';
 
 const GB = 1024 * 1024 * 1024;
 
+const FLOW_CLEAR = 'none';
+
 interface ClientBulkAdjustModalProps {
   open: boolean;
   count: number;
   onOpenChange: (open: boolean) => void;
-  onSubmit: (addDays: number, addBytes: number) => Promise<{ adjusted: number; skipped?: { email: string; reason: string }[] } | null>;
+  onSubmit: (addDays: number, addBytes: number, flow: string) => Promise<{ adjusted: number; skipped?: { email: string; reason: string }[] } | null>;
 }
 
 export default function ClientBulkAdjustModal({ open, count, onOpenChange, onSubmit }: ClientBulkAdjustModalProps) {
@@ -18,12 +21,14 @@ export default function ClientBulkAdjustModal({ open, count, onOpenChange, onSub
   const [messageApi, messageContextHolder] = message.useMessage();
   const [addDays, setAddDays] = useState<number>(0);
   const [addGB, setAddGB] = useState<number>(0);
+  const [flow, setFlow] = useState<string>('');
   const [submitting, setSubmitting] = useState(false);
 
   useEffect(() => {
     if (open) {
       setAddDays(0);
       setAddGB(0);
+      setFlow('');
     }
   }, [open]);
 
@@ -31,16 +36,17 @@ export default function ClientBulkAdjustModal({ open, count, onOpenChange, onSub
     const validated = ClientBulkAdjustFormSchema.safeParse({
       addDays: Math.trunc(Number(addDays) || 0),
       addGB: Number(addGB) || 0,
+      flow,
     });
     if (!validated.success) {
       messageApi.warning(t(validated.error.issues[0]?.message ?? 'somethingWentWrong'));
       return;
     }
-    const { addDays: days, addGB: gb } = validated.data;
+    const { addDays: days, addGB: gb, flow: flowValue } = validated.data;
     setSubmitting(true);
     try {
       const bytes = Math.trunc(gb * GB);
-      const result = await onSubmit(days, bytes);
+      const result = await onSubmit(days, bytes, flowValue);
       if (!result) return;
       const ok = result.adjusted ?? 0;
       const skipped = result.skipped?.length ?? 0;
@@ -95,6 +101,18 @@ export default function ClientBulkAdjustModal({ open, count, onOpenChange, onSub
               step={1}
             />
           </Form.Item>
+          <Form.Item label={t('pages.clients.bulkFlow')}>
+            <Select
+              value={flow}
+              onChange={setFlow}
+              style={{ width: '100%' }}
+              options={[
+                { value: '', label: t('pages.clients.bulkFlowNoChange') },
+                { value: FLOW_CLEAR, label: t('pages.clients.bulkFlowDisable') },
+                ...Object.values(TLS_FLOW_CONTROL).map((k) => ({ value: k, label: k })),
+              ]}
+            />
+          </Form.Item>
         </Form>
       </Modal>
     </>

+ 2 - 2
frontend/src/pages/clients/ClientsPage.tsx

@@ -1418,8 +1418,8 @@ export default function ClientsPage() {
             open={bulkAdjustOpen}
             count={selectedRowKeys.length}
             onOpenChange={setBulkAdjustOpen}
-            onSubmit={async (addDays, addBytes) => {
-              const msg = await bulkAdjust([...selectedRowKeys], addDays, addBytes);
+            onSubmit={async (addDays, addBytes, flow) => {
+              const msg = await bulkAdjust([...selectedRowKeys], addDays, addBytes, flow);
               if (msg?.success) {
                 setSelectedRowKeys([]);
                 return msg.obj ?? { adjusted: 0 };

+ 2 - 1
frontend/src/schemas/client.ts

@@ -188,8 +188,9 @@ export const ClientBulkAdjustFormSchema = z
   .object({
     addDays: z.number().int(),
     addGB: z.number(),
+    flow: z.string().optional().default(''),
   })
-  .refine((v) => v.addDays !== 0 || v.addGB !== 0, {
+  .refine((v) => v.addDays !== 0 || v.addGB !== 0 || v.flow !== '', {
     message: 'pages.clients.bulkAdjustNothing',
   });
 

+ 2 - 1
internal/web/controller/client.go

@@ -248,6 +248,7 @@ type bulkAdjustRequest struct {
 	Emails   []string `json:"emails"`
 	AddDays  int      `json:"addDays"`
 	AddBytes int64    `json:"addBytes"`
+	Flow     string   `json:"flow"`
 }
 
 func (a *ClientController) bulkAdjust(c *gin.Context) {
@@ -256,7 +257,7 @@ func (a *ClientController) bulkAdjust(c *gin.Context) {
 		jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
 		return
 	}
-	result, needRestart, err := a.clientService.BulkAdjust(&a.inboundService, req.Emails, req.AddDays, req.AddBytes)
+	result, needRestart, err := a.clientService.BulkAdjust(&a.inboundService, req.Emails, req.AddDays, req.AddBytes, req.Flow)
 	if err != nil {
 		jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
 		return

+ 102 - 14
internal/web/service/client_bulk.go

@@ -242,6 +242,22 @@ type bulkAdjustEntry struct {
 	newTotal    int64
 }
 
+// bulkFlowClear is the directive that strips the XTLS flow from every selected
+// client. The vision values are the only positive flows xray accepts.
+const bulkFlowClear = "none"
+
+// bulkFlowAllowed whitelists the flow directives BulkAdjust accepts. Anything
+// outside this set is treated as "" (leave flow untouched) so a malformed or
+// hostile value can never be injected into a client's settings. The dropdown in
+// ClientBulkAdjustModal.tsx offers the same set ("" / "none" / TLS_FLOW_CONTROL);
+// keep the two in sync.
+var bulkFlowAllowed = map[string]struct{}{
+	"":                        {},
+	bulkFlowClear:             {},
+	"xtls-rprx-vision":        {},
+	"xtls-rprx-vision-udp443": {},
+}
+
 // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
 // for every email in the list. Clients whose corresponding field is
 // unlimited (0) are skipped — bulk extend should not accidentally
@@ -250,12 +266,17 @@ type bulkAdjustEntry struct {
 // Like BulkDelete, the work is grouped by inbound so each inbound's
 // settings JSON is parsed and written exactly once regardless of how
 // many target emails it contains.
-func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) {
+func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64, flow string) (BulkAdjustResult, bool, error) {
 	result := BulkAdjustResult{}
 	if len(emails) == 0 {
 		return result, false, nil
 	}
-	if addDays == 0 && addBytes == 0 {
+	flow = strings.TrimSpace(flow)
+	if _, ok := bulkFlowAllowed[flow]; !ok {
+		flow = "" // ignore unknown directives — "" means "leave flow untouched"
+	}
+	adjustFlow := flow != ""
+	if addDays == 0 && addBytes == 0 && !adjustFlow {
 		return result, false, common.NewError("no adjustment specified")
 	}
 
@@ -342,7 +363,7 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
 				entry.newTotal = next
 			}
 		}
-		if entry.applyExpiry || entry.applyTotal {
+		if entry.applyExpiry || entry.applyTotal || adjustFlow {
 			plan[email] = entry
 		}
 	}
@@ -379,11 +400,19 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
 	}
 
 	needRestart := false
+	flowHonored := map[string]bool{}
+	flowIneligible := map[string]bool{}
 	for inboundId, ibEmails := range emailsByInbound {
-		ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan)
+		ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan, flow)
 		if ibRes.needRestart {
 			needRestart = true
 		}
+		for email := range ibRes.flowHonored {
+			flowHonored[email] = true
+		}
+		for email := range ibRes.flowIneligible {
+			flowIneligible[email] = true
+		}
 		for email, reason := range ibRes.perEmailSkipped {
 			if _, already := skippedReasons[email]; !already {
 				skippedReasons[email] = reason
@@ -391,6 +420,7 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
 		}
 	}
 
+	adjusted := map[string]struct{}{}
 	for email, entry := range plan {
 		if _, skipped := skippedReasons[email]; skipped {
 			continue
@@ -402,27 +432,49 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
 		if entry.applyTotal {
 			updates["total"] = entry.newTotal
 		}
-		if len(updates) == 0 {
-			continue
-		}
-		if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
-			if _, already := skippedReasons[email]; !already {
-				skippedReasons[email] = err.Error()
+		if len(updates) > 0 {
+			if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
+				if _, already := skippedReasons[email]; !already {
+					skippedReasons[email] = err.Error()
+				}
+				continue
 			}
-			continue
 		}
-		result.Adjusted++
+		// Counted when expiry/total changed, or a flow directive was honored
+		// for this client (flow lives in the inbound JSON, not ClientTraffic).
+		if len(updates) > 0 || flowHonored[email] {
+			adjusted[email] = struct{}{}
+		}
 	}
+	result.Adjusted = len(adjusted)
 
 	for email, reason := range skippedReasons {
 		result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
 	}
+	// Report a flow directive that no inbound could carry — only when it was not
+	// honored anywhere and the client has no other (expiry/total) skip reason.
+	// The expiry/total part, if any, has already been applied and counted above.
+	for email := range flowIneligible {
+		if flowHonored[email] {
+			continue
+		}
+		if _, already := skippedReasons[email]; already {
+			continue
+		}
+		result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "flow not supported on inbound"})
+	}
 	return result, needRestart, nil
 }
 
 type bulkInboundAdjustResult struct {
 	perEmailSkipped map[string]string
-	needRestart     bool
+	flowHonored     map[string]bool
+	// flowIneligible is tracked apart from perEmailSkipped: a flow directive
+	// that an inbound cannot carry must not suppress the expiry/total write for
+	// the same client (which would diverge the inbound JSON / ClientRecord from
+	// ClientTraffic). It only feeds the final Skipped report.
+	flowIneligible map[string]bool
+	needRestart    bool
 }
 
 // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
@@ -436,8 +488,9 @@ func (s *ClientService) bulkAdjustInboundClients(
 	inboundId int,
 	emails []string,
 	plan map[string]*bulkAdjustEntry,
+	flow string,
 ) bulkInboundAdjustResult {
-	res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}}
+	res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}, flowHonored: map[string]bool{}, flowIneligible: map[string]bool{}}
 
 	defer lockInbound(inboundId).Unlock()
 
@@ -469,8 +522,15 @@ func (s *ClientService) bulkAdjustInboundClients(
 		wantedEmails[email] = struct{}{}
 	}
 
+	// Flow eligibility is a property of the inbound (protocol + transport), so
+	// resolve it once. Clearing flow is always allowed; setting a vision flow
+	// is only honored on an inbound that can carry it.
+	flowEligible := flow == bulkFlowClear ||
+		inboundCanEnableTlsFlow(string(oldInbound.Protocol), oldInbound.StreamSettings, oldInbound.Settings)
+
 	interfaceClients, _ := settings["clients"].([]any)
 	foundEmails := map[string]bool{}
+	flowChanged := false
 	nowMs := time.Now().Unix() * 1000
 	for i, client := range interfaceClients {
 		c, ok := client.(map[string]any)
@@ -488,6 +548,23 @@ func (s *ClientService) bulkAdjustInboundClients(
 		if entry.applyTotal {
 			c["totalGB"] = entry.newTotal
 		}
+		if flow != "" {
+			if flowEligible {
+				want := ""
+				if flow != bulkFlowClear {
+					want = flow
+				}
+				if cur, _ := c["flow"].(string); cur != want {
+					c["flow"] = want
+					flowChanged = true
+				}
+				res.flowHonored[targetEmail] = true
+			} else {
+				// Record separately so this never suppresses the expiry/total
+				// write for the same client (see flowIneligible doc).
+				res.flowIneligible[targetEmail] = true
+			}
+		}
 		c["updated_at"] = nowMs
 		interfaceClients[i] = c
 		foundEmails[targetEmail] = true
@@ -513,6 +590,13 @@ func (s *ClientService) bulkAdjustInboundClients(
 	}
 	oldInbound.Settings = string(newSettings)
 
+	// A flow change rewrites the user's xray config, which the lightweight
+	// UpdateUser push below does not carry. Local nodes reload via restart;
+	// remote nodes get a full reconcile (MarkNodeDirty) instead of a per-user push.
+	if flowChanged && oldInbound.NodeID == nil {
+		res.needRestart = true
+	}
+
 	markDirty := false
 	if oldInbound.NodeID != nil {
 		rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
@@ -525,6 +609,10 @@ func (s *ClientService) bulkAdjustInboundClients(
 			if dirty {
 				markDirty = true
 			}
+			if flowChanged {
+				markDirty = true
+				push = false
+			}
 			// Large batches collapse into one reconcile push rather than M updates.
 			if push && len(foundEmails) > nodeBulkPushThreshold {
 				markDirty = true

+ 217 - 0
internal/web/service/client_bulk_flow_test.go

@@ -0,0 +1,217 @@
+package service
+
+import (
+	"testing"
+	"time"
+
+	"github.com/mhsanaei/3x-ui/v3/internal/database"
+	"github.com/mhsanaei/3x-ui/v3/internal/database/model"
+	"github.com/mhsanaei/3x-ui/v3/internal/xray"
+)
+
+// mkInboundStream is mkInbound with explicit stream settings, needed to make an
+// inbound flow-eligible (VLESS + tcp + reality/tls).
+func mkInboundStream(t *testing.T, port int, proto model.Protocol, settings, stream string) *model.Inbound {
+	t.Helper()
+	ib := &model.Inbound{
+		Tag:            string(proto) + "-stream-" + emailSafe(port),
+		Enable:         true,
+		Port:           port,
+		Protocol:       proto,
+		Settings:       settings,
+		StreamSettings: stream,
+	}
+	if err := database.GetDB().Create(ib).Error; err != nil {
+		t.Fatalf("create inbound %d: %v", port, err)
+	}
+	return ib
+}
+
+func emailSafe(port int) string {
+	return string(rune('a'+port%26)) + string(rune('a'+(port/26)%26))
+}
+
+func flowOf(t *testing.T, svc *ClientService, email string) string {
+	t.Helper()
+	rec, err := svc.GetRecordByEmail(nil, email)
+	if err != nil {
+		t.Fatalf("GetRecordByEmail(%q): %v", email, err)
+	}
+	return rec.Flow
+}
+
+const realityStream = `{"network":"tcp","security":"reality"}`
+const wsStream = `{"network":"ws","security":"none"}`
+
+// TestBulkAdjust_FlowSetAndClear covers the happy path: a vision flow is applied
+// on an eligible VLESS inbound and later cleared with the "none" directive. Both
+// transitions are real config changes, so they must request a restart.
+func TestBulkAdjust_FlowSetAndClear(t *testing.T) {
+	setupBulkDB(t)
+	svc := &ClientService{}
+	inboundSvc := &InboundService{}
+
+	clients := []model.Client{
+		{Email: "f1@x", ID: "11111111-1111-1111-1111-111111111111", SubID: "f1", Enable: true},
+		{Email: "f2@x", ID: "22222222-2222-2222-2222-222222222222", SubID: "f2", Enable: true},
+	}
+	ib := mkInboundStream(t, 30001, model.VLESS, clientsSettings(t, clients), realityStream)
+	if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
+		t.Fatalf("seed: %v", err)
+	}
+	emails := emailsOf(clients)
+
+	// Set vision flow.
+	res, restart, err := svc.BulkAdjust(inboundSvc, emails, 0, 0, "xtls-rprx-vision-udp443")
+	if err != nil {
+		t.Fatalf("BulkAdjust set: %v", err)
+	}
+	if res.Adjusted != 2 {
+		t.Fatalf("expected 2 adjusted, got %d (skipped=%v)", res.Adjusted, res.Skipped)
+	}
+	if !restart {
+		t.Fatalf("setting flow should request a restart")
+	}
+	for _, e := range emails {
+		if got := flowOf(t, svc, e); got != "xtls-rprx-vision-udp443" {
+			t.Fatalf("%s flow = %q, want xtls-rprx-vision-udp443", e, got)
+		}
+	}
+
+	// Setting the same flow again is a no-op: honored (counted) but no restart.
+	if _, restart2, err := svc.BulkAdjust(inboundSvc, emails, 0, 0, "xtls-rprx-vision-udp443"); err != nil {
+		t.Fatalf("BulkAdjust idempotent: %v", err)
+	} else if restart2 {
+		t.Fatalf("re-setting identical flow should not request a restart")
+	}
+
+	// Clear flow.
+	cres, crestart, err := svc.BulkAdjust(inboundSvc, emails, 0, 0, "none")
+	if err != nil {
+		t.Fatalf("BulkAdjust clear: %v", err)
+	}
+	if cres.Adjusted != 2 {
+		t.Fatalf("expected 2 cleared, got %d (skipped=%v)", cres.Adjusted, cres.Skipped)
+	}
+	if !crestart {
+		t.Fatalf("clearing flow should request a restart")
+	}
+	for _, e := range emails {
+		if got := flowOf(t, svc, e); got != "" {
+			t.Fatalf("%s flow = %q, want empty after clear", e, got)
+		}
+	}
+}
+
+// TestBulkAdjust_FlowIneligibleSkipped verifies a vision flow is refused on an
+// inbound that cannot carry it (ws transport), reported as skipped, and the
+// client's flow is left untouched.
+func TestBulkAdjust_FlowIneligibleSkipped(t *testing.T) {
+	setupBulkDB(t)
+	svc := &ClientService{}
+	inboundSvc := &InboundService{}
+
+	clients := []model.Client{
+		{Email: "ws1@x", ID: "33333333-3333-3333-3333-333333333333", SubID: "ws1", Enable: true},
+	}
+	ib := mkInboundStream(t, 30101, model.VLESS, clientsSettings(t, clients), wsStream)
+	if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
+		t.Fatalf("seed: %v", err)
+	}
+
+	res, restart, err := svc.BulkAdjust(inboundSvc, []string{"ws1@x"}, 0, 0, "xtls-rprx-vision")
+	if err != nil {
+		t.Fatalf("BulkAdjust: %v", err)
+	}
+	if res.Adjusted != 0 {
+		t.Fatalf("ineligible inbound should adjust nothing, got %d", res.Adjusted)
+	}
+	if restart {
+		t.Fatalf("no change should not request a restart")
+	}
+	if len(res.Skipped) != 1 || res.Skipped[0].Email != "ws1@x" {
+		t.Fatalf("expected ws1@x in skipped, got %v", res.Skipped)
+	}
+	if got := flowOf(t, svc, "ws1@x"); got != "" {
+		t.Fatalf("flow should stay empty on ineligible inbound, got %q", got)
+	}
+}
+
+// TestBulkAdjust_NoDirectiveErrors guards the relaxed precondition: with no
+// days, traffic, or flow set there is nothing to do.
+func TestBulkAdjust_NoDirectiveErrors(t *testing.T) {
+	setupBulkDB(t)
+	svc := &ClientService{}
+	inboundSvc := &InboundService{}
+
+	if _, _, err := svc.BulkAdjust(inboundSvc, []string{"any@x"}, 0, 0, ""); err == nil {
+		t.Fatalf("expected error when no adjustment is specified")
+	}
+	// An unknown flow directive is ignored (treated as ""), so it also errors.
+	if _, _, err := svc.BulkAdjust(inboundSvc, []string{"any@x"}, 0, 0, "bogus-flow"); err == nil {
+		t.Fatalf("unknown flow should be ignored and error like an empty directive")
+	}
+}
+
+// TestBulkAdjust_DaysApplyDespiteIneligibleFlow is the regression for the review
+// blocker: when a client on a flow-ineligible inbound is adjusted with BOTH a
+// days/traffic delta AND a flow directive, the days/traffic change must still be
+// persisted to ClientTraffic (not just the inbound JSON / ClientRecord) and the
+// client must count as adjusted, while the unhonored flow is reported separately.
+func TestBulkAdjust_DaysApplyDespiteIneligibleFlow(t *testing.T) {
+	setupBulkDB(t)
+	svc := &ClientService{}
+	inboundSvc := &InboundService{}
+
+	const day = int64(24 * 60 * 60 * 1000)
+	const gb = int64(1) << 30
+	baseExpiry := time.Now().UnixMilli() + 30*day
+	baseTotal := 10 * gb
+
+	clients := []model.Client{
+		{Email: "mix@x", ID: "44444444-4444-4444-4444-444444444444", SubID: "mix", Enable: true, ExpiryTime: baseExpiry, TotalGB: baseTotal},
+	}
+	ib := mkInboundStream(t, 30201, model.VLESS, clientsSettings(t, clients), wsStream)
+	if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
+		t.Fatalf("seed: %v", err)
+	}
+	// ClientTraffic is the store the enforcement job reads; seed it to match.
+	if err := database.GetDB().Create(&xray.ClientTraffic{Email: "mix@x", Enable: true, ExpiryTime: baseExpiry, Total: baseTotal}).Error; err != nil {
+		t.Fatalf("seed traffic: %v", err)
+	}
+
+	res, _, err := svc.BulkAdjust(inboundSvc, []string{"mix@x"}, 7, gb, "xtls-rprx-vision")
+	if err != nil {
+		t.Fatalf("BulkAdjust: %v", err)
+	}
+	if res.Adjusted != 1 {
+		t.Fatalf("days/traffic should still be applied: Adjusted=%d skipped=%v", res.Adjusted, res.Skipped)
+	}
+	if len(res.Skipped) != 1 || res.Skipped[0].Email != "mix@x" {
+		t.Fatalf("expected mix@x reported for the unhonored flow, got %v", res.Skipped)
+	}
+
+	wantExpiry := baseExpiry + 7*day
+	wantTotal := baseTotal + gb
+
+	// ClientRecord (inbound-derived) advanced.
+	if rec, err := svc.GetRecordByEmail(nil, "mix@x"); err != nil {
+		t.Fatalf("record: %v", err)
+	} else if rec.ExpiryTime != wantExpiry || rec.TotalGB != wantTotal {
+		t.Fatalf("ClientRecord not advanced: expiry=%d total=%d", rec.ExpiryTime, rec.TotalGB)
+	}
+
+	// ClientTraffic advanced in lockstep — no divergence.
+	var ct xray.ClientTraffic
+	if err := database.GetDB().Where("email = ?", "mix@x").First(&ct).Error; err != nil {
+		t.Fatalf("traffic row: %v", err)
+	}
+	if ct.ExpiryTime != wantExpiry || ct.Total != wantTotal {
+		t.Fatalf("ClientTraffic diverged: expiry=%d total=%d, want expiry=%d total=%d", ct.ExpiryTime, ct.Total, wantExpiry, wantTotal)
+	}
+
+	// Flow left untouched on the ineligible inbound.
+	if got := flowOf(t, svc, "mix@x"); got != "" {
+		t.Fatalf("flow should stay empty on ineligible inbound, got %q", got)
+	}
+}

+ 1 - 1
internal/web/service/sync_scale_postgres_test.go

@@ -346,7 +346,7 @@ func TestBulkOpsPostgresScale(t *testing.T) {
 			}
 
 			t0 := time.Now()
-			if _, _, err := svc.BulkAdjust(inboundSvc, emailsM, 7, 1<<30); err != nil {
+			if _, _, err := svc.BulkAdjust(inboundSvc, emailsM, 7, 1<<30, ""); err != nil {
 				t.Fatalf("BulkAdjust: %v", err)
 			}
 			adjustDur := time.Since(t0)

+ 4 - 1
internal/web/translation/en-US.json

@@ -831,9 +831,12 @@
       "bulkDeleteConfirmContent": "Each selected client is removed from every attached inbound and its traffic record is dropped. This cannot be undone.",
       "bulkAdjustTitle": "Adjust {count} clients",
       "bulkAdjustHint": "Positive values extend, negative values reduce. Clients with unlimited expiry or traffic are skipped for that field.",
-      "bulkAdjustNothing": "Set days or traffic before applying.",
+      "bulkAdjustNothing": "Set days, traffic, or flow before applying.",
       "addDays": "Add days",
       "addTrafficGB": "Add traffic (GB)",
+      "bulkFlow": "Set flow",
+      "bulkFlowNoChange": "No change",
+      "bulkFlowDisable": "Disable (clear flow)",
       "delDepleted": "Delete depleted",
       "delDepletedConfirmTitle": "Delete depleted clients?",
       "delDepletedConfirmContent": "Removes every client whose traffic quota is exhausted or whose expiry has passed. This cannot be undone.",