|
@@ -242,6 +242,22 @@ type bulkAdjustEntry struct {
|
|
|
newTotal int64
|
|
newTotal int64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// bulkFlowClear is the directive that strips the XTLS flow from every selected
|
|
|
|
|
+// client. The vision values are the only positive flows xray accepts.
|
|
|
|
|
+const bulkFlowClear = "none"
|
|
|
|
|
+
|
|
|
|
|
+// bulkFlowAllowed whitelists the flow directives BulkAdjust accepts. Anything
|
|
|
|
|
+// outside this set is treated as "" (leave flow untouched) so a malformed or
|
|
|
|
|
+// hostile value can never be injected into a client's settings. The dropdown in
|
|
|
|
|
+// ClientBulkAdjustModal.tsx offers the same set ("" / "none" / TLS_FLOW_CONTROL);
|
|
|
|
|
+// keep the two in sync.
|
|
|
|
|
+var bulkFlowAllowed = map[string]struct{}{
|
|
|
|
|
+ "": {},
|
|
|
|
|
+ bulkFlowClear: {},
|
|
|
|
|
+ "xtls-rprx-vision": {},
|
|
|
|
|
+ "xtls-rprx-vision-udp443": {},
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
|
|
// BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
|
|
|
// for every email in the list. Clients whose corresponding field is
|
|
// for every email in the list. Clients whose corresponding field is
|
|
|
// unlimited (0) are skipped — bulk extend should not accidentally
|
|
// unlimited (0) are skipped — bulk extend should not accidentally
|
|
@@ -250,12 +266,17 @@ type bulkAdjustEntry struct {
|
|
|
// Like BulkDelete, the work is grouped by inbound so each inbound's
|
|
// Like BulkDelete, the work is grouped by inbound so each inbound's
|
|
|
// settings JSON is parsed and written exactly once regardless of how
|
|
// settings JSON is parsed and written exactly once regardless of how
|
|
|
// many target emails it contains.
|
|
// many target emails it contains.
|
|
|
-func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) {
|
|
|
|
|
|
|
+func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64, flow string) (BulkAdjustResult, bool, error) {
|
|
|
result := BulkAdjustResult{}
|
|
result := BulkAdjustResult{}
|
|
|
if len(emails) == 0 {
|
|
if len(emails) == 0 {
|
|
|
return result, false, nil
|
|
return result, false, nil
|
|
|
}
|
|
}
|
|
|
- if addDays == 0 && addBytes == 0 {
|
|
|
|
|
|
|
+ flow = strings.TrimSpace(flow)
|
|
|
|
|
+ if _, ok := bulkFlowAllowed[flow]; !ok {
|
|
|
|
|
+ flow = "" // ignore unknown directives — "" means "leave flow untouched"
|
|
|
|
|
+ }
|
|
|
|
|
+ adjustFlow := flow != ""
|
|
|
|
|
+ if addDays == 0 && addBytes == 0 && !adjustFlow {
|
|
|
return result, false, common.NewError("no adjustment specified")
|
|
return result, false, common.NewError("no adjustment specified")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -342,7 +363,7 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
|
|
|
entry.newTotal = next
|
|
entry.newTotal = next
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- if entry.applyExpiry || entry.applyTotal {
|
|
|
|
|
|
|
+ if entry.applyExpiry || entry.applyTotal || adjustFlow {
|
|
|
plan[email] = entry
|
|
plan[email] = entry
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -379,11 +400,19 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
needRestart := false
|
|
needRestart := false
|
|
|
|
|
+ flowHonored := map[string]bool{}
|
|
|
|
|
+ flowIneligible := map[string]bool{}
|
|
|
for inboundId, ibEmails := range emailsByInbound {
|
|
for inboundId, ibEmails := range emailsByInbound {
|
|
|
- ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan)
|
|
|
|
|
|
|
+ ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan, flow)
|
|
|
if ibRes.needRestart {
|
|
if ibRes.needRestart {
|
|
|
needRestart = true
|
|
needRestart = true
|
|
|
}
|
|
}
|
|
|
|
|
+ for email := range ibRes.flowHonored {
|
|
|
|
|
+ flowHonored[email] = true
|
|
|
|
|
+ }
|
|
|
|
|
+ for email := range ibRes.flowIneligible {
|
|
|
|
|
+ flowIneligible[email] = true
|
|
|
|
|
+ }
|
|
|
for email, reason := range ibRes.perEmailSkipped {
|
|
for email, reason := range ibRes.perEmailSkipped {
|
|
|
if _, already := skippedReasons[email]; !already {
|
|
if _, already := skippedReasons[email]; !already {
|
|
|
skippedReasons[email] = reason
|
|
skippedReasons[email] = reason
|
|
@@ -391,6 +420,7 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ adjusted := map[string]struct{}{}
|
|
|
for email, entry := range plan {
|
|
for email, entry := range plan {
|
|
|
if _, skipped := skippedReasons[email]; skipped {
|
|
if _, skipped := skippedReasons[email]; skipped {
|
|
|
continue
|
|
continue
|
|
@@ -402,27 +432,49 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
|
|
|
if entry.applyTotal {
|
|
if entry.applyTotal {
|
|
|
updates["total"] = entry.newTotal
|
|
updates["total"] = entry.newTotal
|
|
|
}
|
|
}
|
|
|
- if len(updates) == 0 {
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
|
|
|
|
|
- if _, already := skippedReasons[email]; !already {
|
|
|
|
|
- skippedReasons[email] = err.Error()
|
|
|
|
|
|
|
+ if len(updates) > 0 {
|
|
|
|
|
+ if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
|
|
|
|
|
+ if _, already := skippedReasons[email]; !already {
|
|
|
|
|
+ skippedReasons[email] = err.Error()
|
|
|
|
|
+ }
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
- continue
|
|
|
|
|
}
|
|
}
|
|
|
- result.Adjusted++
|
|
|
|
|
|
|
+ // Counted when expiry/total changed, or a flow directive was honored
|
|
|
|
|
+ // for this client (flow lives in the inbound JSON, not ClientTraffic).
|
|
|
|
|
+ if len(updates) > 0 || flowHonored[email] {
|
|
|
|
|
+ adjusted[email] = struct{}{}
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
+ result.Adjusted = len(adjusted)
|
|
|
|
|
|
|
|
for email, reason := range skippedReasons {
|
|
for email, reason := range skippedReasons {
|
|
|
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
|
|
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
|
|
|
}
|
|
}
|
|
|
|
|
+ // Report a flow directive that no inbound could carry — only when it was not
|
|
|
|
|
+ // honored anywhere and the client has no other (expiry/total) skip reason.
|
|
|
|
|
+ // The expiry/total part, if any, has already been applied and counted above.
|
|
|
|
|
+ for email := range flowIneligible {
|
|
|
|
|
+ if flowHonored[email] {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, already := skippedReasons[email]; already {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "flow not supported on inbound"})
|
|
|
|
|
+ }
|
|
|
return result, needRestart, nil
|
|
return result, needRestart, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type bulkInboundAdjustResult struct {
|
|
type bulkInboundAdjustResult struct {
|
|
|
perEmailSkipped map[string]string
|
|
perEmailSkipped map[string]string
|
|
|
- needRestart bool
|
|
|
|
|
|
|
+ flowHonored map[string]bool
|
|
|
|
|
+ // flowIneligible is tracked apart from perEmailSkipped: a flow directive
|
|
|
|
|
+ // that an inbound cannot carry must not suppress the expiry/total write for
|
|
|
|
|
+ // the same client (which would diverge the inbound JSON / ClientRecord from
|
|
|
|
|
+ // ClientTraffic). It only feeds the final Skipped report.
|
|
|
|
|
+ flowIneligible map[string]bool
|
|
|
|
|
+ needRestart bool
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// bulkAdjustInboundClients applies expiry/total deltas to multiple clients
|
|
// bulkAdjustInboundClients applies expiry/total deltas to multiple clients
|
|
@@ -436,8 +488,9 @@ func (s *ClientService) bulkAdjustInboundClients(
|
|
|
inboundId int,
|
|
inboundId int,
|
|
|
emails []string,
|
|
emails []string,
|
|
|
plan map[string]*bulkAdjustEntry,
|
|
plan map[string]*bulkAdjustEntry,
|
|
|
|
|
+ flow string,
|
|
|
) bulkInboundAdjustResult {
|
|
) bulkInboundAdjustResult {
|
|
|
- res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}}
|
|
|
|
|
|
|
+ res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}, flowHonored: map[string]bool{}, flowIneligible: map[string]bool{}}
|
|
|
|
|
|
|
|
defer lockInbound(inboundId).Unlock()
|
|
defer lockInbound(inboundId).Unlock()
|
|
|
|
|
|
|
@@ -469,8 +522,15 @@ func (s *ClientService) bulkAdjustInboundClients(
|
|
|
wantedEmails[email] = struct{}{}
|
|
wantedEmails[email] = struct{}{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Flow eligibility is a property of the inbound (protocol + transport), so
|
|
|
|
|
+ // resolve it once. Clearing flow is always allowed; setting a vision flow
|
|
|
|
|
+ // is only honored on an inbound that can carry it.
|
|
|
|
|
+ flowEligible := flow == bulkFlowClear ||
|
|
|
|
|
+ inboundCanEnableTlsFlow(string(oldInbound.Protocol), oldInbound.StreamSettings, oldInbound.Settings)
|
|
|
|
|
+
|
|
|
interfaceClients, _ := settings["clients"].([]any)
|
|
interfaceClients, _ := settings["clients"].([]any)
|
|
|
foundEmails := map[string]bool{}
|
|
foundEmails := map[string]bool{}
|
|
|
|
|
+ flowChanged := false
|
|
|
nowMs := time.Now().Unix() * 1000
|
|
nowMs := time.Now().Unix() * 1000
|
|
|
for i, client := range interfaceClients {
|
|
for i, client := range interfaceClients {
|
|
|
c, ok := client.(map[string]any)
|
|
c, ok := client.(map[string]any)
|
|
@@ -488,6 +548,23 @@ func (s *ClientService) bulkAdjustInboundClients(
|
|
|
if entry.applyTotal {
|
|
if entry.applyTotal {
|
|
|
c["totalGB"] = entry.newTotal
|
|
c["totalGB"] = entry.newTotal
|
|
|
}
|
|
}
|
|
|
|
|
+ if flow != "" {
|
|
|
|
|
+ if flowEligible {
|
|
|
|
|
+ want := ""
|
|
|
|
|
+ if flow != bulkFlowClear {
|
|
|
|
|
+ want = flow
|
|
|
|
|
+ }
|
|
|
|
|
+ if cur, _ := c["flow"].(string); cur != want {
|
|
|
|
|
+ c["flow"] = want
|
|
|
|
|
+ flowChanged = true
|
|
|
|
|
+ }
|
|
|
|
|
+ res.flowHonored[targetEmail] = true
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // Record separately so this never suppresses the expiry/total
|
|
|
|
|
+ // write for the same client (see flowIneligible doc).
|
|
|
|
|
+ res.flowIneligible[targetEmail] = true
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
c["updated_at"] = nowMs
|
|
c["updated_at"] = nowMs
|
|
|
interfaceClients[i] = c
|
|
interfaceClients[i] = c
|
|
|
foundEmails[targetEmail] = true
|
|
foundEmails[targetEmail] = true
|
|
@@ -513,6 +590,13 @@ func (s *ClientService) bulkAdjustInboundClients(
|
|
|
}
|
|
}
|
|
|
oldInbound.Settings = string(newSettings)
|
|
oldInbound.Settings = string(newSettings)
|
|
|
|
|
|
|
|
|
|
+ // A flow change rewrites the user's xray config, which the lightweight
|
|
|
|
|
+ // UpdateUser push below does not carry. Local nodes reload via restart;
|
|
|
|
|
+ // remote nodes get a full reconcile (MarkNodeDirty) instead of a per-user push.
|
|
|
|
|
+ if flowChanged && oldInbound.NodeID == nil {
|
|
|
|
|
+ res.needRestart = true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
markDirty := false
|
|
markDirty := false
|
|
|
if oldInbound.NodeID != nil {
|
|
if oldInbound.NodeID != nil {
|
|
|
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
|
|
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
|
|
@@ -525,6 +609,10 @@ func (s *ClientService) bulkAdjustInboundClients(
|
|
|
if dirty {
|
|
if dirty {
|
|
|
markDirty = true
|
|
markDirty = true
|
|
|
}
|
|
}
|
|
|
|
|
+ if flowChanged {
|
|
|
|
|
+ markDirty = true
|
|
|
|
|
+ push = false
|
|
|
|
|
+ }
|
|
|
// Large batches collapse into one reconcile push rather than M updates.
|
|
// Large batches collapse into one reconcile push rather than M updates.
|
|
|
if push && len(foundEmails) > nodeBulkPushThreshold {
|
|
if push && len(foundEmails) > nodeBulkPushThreshold {
|
|
|
markDirty = true
|
|
markDirty = true
|