1
0

client_bulk_flow_test.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package service
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/mhsanaei/3x-ui/v3/internal/database"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  8. )
  9. // mkInboundStream is mkInbound with explicit stream settings, needed to make an
  10. // inbound flow-eligible (VLESS + tcp + reality/tls).
  11. func mkInboundStream(t *testing.T, port int, proto model.Protocol, settings, stream string) *model.Inbound {
  12. t.Helper()
  13. ib := &model.Inbound{
  14. Tag: string(proto) + "-stream-" + emailSafe(port),
  15. Enable: true,
  16. Port: port,
  17. Protocol: proto,
  18. Settings: settings,
  19. StreamSettings: stream,
  20. }
  21. if err := database.GetDB().Create(ib).Error; err != nil {
  22. t.Fatalf("create inbound %d: %v", port, err)
  23. }
  24. return ib
  25. }
  26. func emailSafe(port int) string {
  27. return string(rune('a'+port%26)) + string(rune('a'+(port/26)%26))
  28. }
  29. func flowOf(t *testing.T, svc *ClientService, email string) string {
  30. t.Helper()
  31. rec, err := svc.GetRecordByEmail(nil, email)
  32. if err != nil {
  33. t.Fatalf("GetRecordByEmail(%q): %v", email, err)
  34. }
  35. return rec.Flow
  36. }
  37. const (
  38. realityStream = `{"network":"tcp","security":"reality"}`
  39. wsStream = `{"network":"ws","security":"none"}`
  40. )
  41. // TestBulkAdjust_FlowSetAndClear covers the happy path: a vision flow is applied
  42. // on an eligible VLESS inbound and later cleared with the "none" directive. Both
  43. // transitions are real config changes, so they must request a restart.
  44. func TestBulkAdjust_FlowSetAndClear(t *testing.T) {
  45. setupBulkDB(t)
  46. svc := &ClientService{}
  47. inboundSvc := &InboundService{}
  48. clients := []model.Client{
  49. {Email: "f1@x", ID: "11111111-1111-1111-1111-111111111111", SubID: "f1", Enable: true},
  50. {Email: "f2@x", ID: "22222222-2222-2222-2222-222222222222", SubID: "f2", Enable: true},
  51. }
  52. ib := mkInboundStream(t, 30001, model.VLESS, clientsSettings(t, clients), realityStream)
  53. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  54. t.Fatalf("seed: %v", err)
  55. }
  56. emails := emailsOf(clients)
  57. // Set vision flow.
  58. res, restart, err := svc.BulkAdjust(inboundSvc, emails, 0, 0, "xtls-rprx-vision-udp443")
  59. if err != nil {
  60. t.Fatalf("BulkAdjust set: %v", err)
  61. }
  62. if res.Adjusted != 2 {
  63. t.Fatalf("expected 2 adjusted, got %d (skipped=%v)", res.Adjusted, res.Skipped)
  64. }
  65. if !restart {
  66. t.Fatalf("setting flow should request a restart")
  67. }
  68. for _, e := range emails {
  69. if got := flowOf(t, svc, e); got != "xtls-rprx-vision-udp443" {
  70. t.Fatalf("%s flow = %q, want xtls-rprx-vision-udp443", e, got)
  71. }
  72. }
  73. // Setting the same flow again is a no-op: honored (counted) but no restart.
  74. if _, restart2, err := svc.BulkAdjust(inboundSvc, emails, 0, 0, "xtls-rprx-vision-udp443"); err != nil {
  75. t.Fatalf("BulkAdjust idempotent: %v", err)
  76. } else if restart2 {
  77. t.Fatalf("re-setting identical flow should not request a restart")
  78. }
  79. // Clear flow.
  80. cres, crestart, err := svc.BulkAdjust(inboundSvc, emails, 0, 0, "none")
  81. if err != nil {
  82. t.Fatalf("BulkAdjust clear: %v", err)
  83. }
  84. if cres.Adjusted != 2 {
  85. t.Fatalf("expected 2 cleared, got %d (skipped=%v)", cres.Adjusted, cres.Skipped)
  86. }
  87. if !crestart {
  88. t.Fatalf("clearing flow should request a restart")
  89. }
  90. for _, e := range emails {
  91. if got := flowOf(t, svc, e); got != "" {
  92. t.Fatalf("%s flow = %q, want empty after clear", e, got)
  93. }
  94. }
  95. }
  96. // TestBulkAdjust_FlowIneligibleSkipped verifies a vision flow is refused on an
  97. // inbound that cannot carry it (ws transport), reported as skipped, and the
  98. // client's flow is left untouched.
  99. func TestBulkAdjust_FlowIneligibleSkipped(t *testing.T) {
  100. setupBulkDB(t)
  101. svc := &ClientService{}
  102. inboundSvc := &InboundService{}
  103. clients := []model.Client{
  104. {Email: "ws1@x", ID: "33333333-3333-3333-3333-333333333333", SubID: "ws1", Enable: true},
  105. }
  106. ib := mkInboundStream(t, 30101, model.VLESS, clientsSettings(t, clients), wsStream)
  107. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  108. t.Fatalf("seed: %v", err)
  109. }
  110. res, restart, err := svc.BulkAdjust(inboundSvc, []string{"ws1@x"}, 0, 0, "xtls-rprx-vision")
  111. if err != nil {
  112. t.Fatalf("BulkAdjust: %v", err)
  113. }
  114. if res.Adjusted != 0 {
  115. t.Fatalf("ineligible inbound should adjust nothing, got %d", res.Adjusted)
  116. }
  117. if restart {
  118. t.Fatalf("no change should not request a restart")
  119. }
  120. if len(res.Skipped) != 1 || res.Skipped[0].Email != "ws1@x" {
  121. t.Fatalf("expected ws1@x in skipped, got %v", res.Skipped)
  122. }
  123. if got := flowOf(t, svc, "ws1@x"); got != "" {
  124. t.Fatalf("flow should stay empty on ineligible inbound, got %q", got)
  125. }
  126. }
  127. // TestBulkAdjust_NoDirectiveErrors guards the relaxed precondition: with no
  128. // days, traffic, or flow set there is nothing to do.
  129. func TestBulkAdjust_NoDirectiveErrors(t *testing.T) {
  130. setupBulkDB(t)
  131. svc := &ClientService{}
  132. inboundSvc := &InboundService{}
  133. if _, _, err := svc.BulkAdjust(inboundSvc, []string{"any@x"}, 0, 0, ""); err == nil {
  134. t.Fatalf("expected error when no adjustment is specified")
  135. }
  136. // An unknown flow directive is ignored (treated as ""), so it also errors.
  137. if _, _, err := svc.BulkAdjust(inboundSvc, []string{"any@x"}, 0, 0, "bogus-flow"); err == nil {
  138. t.Fatalf("unknown flow should be ignored and error like an empty directive")
  139. }
  140. }
  141. // TestBulkAdjust_DaysApplyDespiteIneligibleFlow is the regression for the review
  142. // blocker: when a client on a flow-ineligible inbound is adjusted with BOTH a
  143. // days/traffic delta AND a flow directive, the days/traffic change must still be
  144. // persisted to ClientTraffic (not just the inbound JSON / ClientRecord) and the
  145. // client must count as adjusted, while the unhonored flow is reported separately.
  146. func TestBulkAdjust_DaysApplyDespiteIneligibleFlow(t *testing.T) {
  147. setupBulkDB(t)
  148. svc := &ClientService{}
  149. inboundSvc := &InboundService{}
  150. const day = int64(24 * 60 * 60 * 1000)
  151. const gb = int64(1) << 30
  152. baseExpiry := time.Now().UnixMilli() + 30*day
  153. baseTotal := 10 * gb
  154. clients := []model.Client{
  155. {Email: "mix@x", ID: "44444444-4444-4444-4444-444444444444", SubID: "mix", Enable: true, ExpiryTime: baseExpiry, TotalGB: baseTotal},
  156. }
  157. ib := mkInboundStream(t, 30201, model.VLESS, clientsSettings(t, clients), wsStream)
  158. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  159. t.Fatalf("seed: %v", err)
  160. }
  161. // ClientTraffic is the store the enforcement job reads; seed it to match.
  162. if err := database.GetDB().Create(&xray.ClientTraffic{Email: "mix@x", Enable: true, ExpiryTime: baseExpiry, Total: baseTotal}).Error; err != nil {
  163. t.Fatalf("seed traffic: %v", err)
  164. }
  165. res, _, err := svc.BulkAdjust(inboundSvc, []string{"mix@x"}, 7, gb, "xtls-rprx-vision")
  166. if err != nil {
  167. t.Fatalf("BulkAdjust: %v", err)
  168. }
  169. if res.Adjusted != 1 {
  170. t.Fatalf("days/traffic should still be applied: Adjusted=%d skipped=%v", res.Adjusted, res.Skipped)
  171. }
  172. if len(res.Skipped) != 1 || res.Skipped[0].Email != "mix@x" {
  173. t.Fatalf("expected mix@x reported for the unhonored flow, got %v", res.Skipped)
  174. }
  175. wantExpiry := baseExpiry + 7*day
  176. wantTotal := baseTotal + gb
  177. // ClientRecord (inbound-derived) advanced.
  178. if rec, err := svc.GetRecordByEmail(nil, "mix@x"); err != nil {
  179. t.Fatalf("record: %v", err)
  180. } else if rec.ExpiryTime != wantExpiry || rec.TotalGB != wantTotal {
  181. t.Fatalf("ClientRecord not advanced: expiry=%d total=%d", rec.ExpiryTime, rec.TotalGB)
  182. }
  183. // ClientTraffic advanced in lockstep — no divergence.
  184. var ct xray.ClientTraffic
  185. if err := database.GetDB().Where("email = ?", "mix@x").First(&ct).Error; err != nil {
  186. t.Fatalf("traffic row: %v", err)
  187. }
  188. if ct.ExpiryTime != wantExpiry || ct.Total != wantTotal {
  189. t.Fatalf("ClientTraffic diverged: expiry=%d total=%d, want expiry=%d total=%d", ct.ExpiryTime, ct.Total, wantExpiry, wantTotal)
  190. }
  191. // Flow left untouched on the ineligible inbound.
  192. if got := flowOf(t, svc, "mix@x"); got != "" {
  193. t.Fatalf("flow should stay empty on ineligible inbound, got %q", got)
  194. }
  195. }