1
0

node_bulk_dispatch_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sync/atomic"
  6. "testing"
  7. "github.com/google/uuid"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  11. )
  12. // fakeNodeRuntime is a runtime.Runtime stub that counts the per-client dispatch
  13. // calls so a test can assert a bulk op does NOT stream one RPC per client.
  14. type fakeNodeRuntime struct {
  15. addClient atomic.Int32
  16. deleteUser atomic.Int32
  17. updateUser atomic.Int32
  18. }
  19. func (f *fakeNodeRuntime) Name() string { return "fake-node" }
  20. func (f *fakeNodeRuntime) AddInbound(context.Context, *model.Inbound) error { return nil }
  21. func (f *fakeNodeRuntime) DelInbound(context.Context, *model.Inbound) error { return nil }
  22. func (f *fakeNodeRuntime) UpdateInbound(context.Context, *model.Inbound, *model.Inbound) error {
  23. return nil
  24. }
  25. func (f *fakeNodeRuntime) AddUser(context.Context, *model.Inbound, map[string]any) error { return nil }
  26. func (f *fakeNodeRuntime) RemoveUser(context.Context, *model.Inbound, string) error { return nil }
  27. func (f *fakeNodeRuntime) UpdateUser(context.Context, *model.Inbound, string, model.Client) error {
  28. f.updateUser.Add(1)
  29. return nil
  30. }
  31. func (f *fakeNodeRuntime) DeleteUser(context.Context, *model.Inbound, string) error {
  32. f.deleteUser.Add(1)
  33. return nil
  34. }
  35. func (f *fakeNodeRuntime) AddClient(context.Context, *model.Inbound, model.Client) error {
  36. f.addClient.Add(1)
  37. return nil
  38. }
  39. func (f *fakeNodeRuntime) RestartXray(context.Context) error { return nil }
  40. func (f *fakeNodeRuntime) ResetClientTraffic(context.Context, *model.Inbound, string) error {
  41. return nil
  42. }
  43. func (f *fakeNodeRuntime) ResetInboundTraffic(context.Context, *model.Inbound) error { return nil }
  44. func (f *fakeNodeRuntime) ResetAllTraffics(context.Context) error { return nil }
  45. // setupNodeRuntime wires an online node + a fake runtime override and returns the
  46. // node id and the fake so a test can drive the service node-dispatch path without
  47. // a network node.
  48. func setupNodeRuntime(t *testing.T) (int, *fakeNodeRuntime) {
  49. t.Helper()
  50. prev := runtime.GetManager()
  51. mgr := runtime.NewManager(runtime.LocalDeps{APIPort: func() int { return 0 }, SetNeedRestart: func() {}})
  52. runtime.SetManager(mgr)
  53. t.Cleanup(func() { runtime.SetManager(prev) })
  54. node := &model.Node{Name: "n1", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"}
  55. if err := database.GetDB().Create(node).Error; err != nil {
  56. t.Fatalf("create node: %v", err)
  57. }
  58. fake := &fakeNodeRuntime{}
  59. mgr.SetRuntimeOverride(node.Id, fake)
  60. return node.Id, fake
  61. }
  62. func nodeInbound(t *testing.T, nodeID, port int, clients []model.Client) *model.Inbound {
  63. t.Helper()
  64. if clients == nil {
  65. clients = []model.Client{}
  66. }
  67. ib := &model.Inbound{
  68. UserId: 1, NodeID: &nodeID, Tag: fmt.Sprintf("in-%d", port), Enable: true,
  69. Port: port, Protocol: model.VLESS, Settings: clientsSettings(t, clients),
  70. }
  71. if err := database.GetDB().Create(ib).Error; err != nil {
  72. t.Fatalf("create node inbound: %v", err)
  73. }
  74. if err := (&ClientService{}).SyncInbound(nil, ib.Id, clients); err != nil {
  75. t.Fatalf("seed SyncInbound: %v", err)
  76. }
  77. return ib
  78. }
  79. func makeNodeClients(n int) []model.Client {
  80. out := make([]model.Client, n)
  81. for i := range n {
  82. out[i] = model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("nu-%05d@x", i), Enable: true}
  83. }
  84. return out
  85. }
  86. // TestNodeBulk_LargeAddFoldsToDirty: adding more than the threshold of clients to
  87. // an online node inbound must NOT stream one AddClient RPC per client; it marks
  88. // the node dirty so a single reconcile push converges it instead.
  89. func TestNodeBulk_LargeAddFoldsToDirty(t *testing.T) {
  90. setupBulkDB(t)
  91. nodeID, fake := setupNodeRuntime(t)
  92. ib := nodeInbound(t, nodeID, 30001, nil)
  93. svc := &ClientService{}
  94. inboundSvc := &InboundService{}
  95. add := makeNodeClients(nodeBulkPushThreshold + 10)
  96. if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil {
  97. t.Fatalf("AddInboundClient: %v", err)
  98. }
  99. if got := fake.addClient.Load(); got != 0 {
  100. t.Fatalf("large add streamed %d AddClient RPCs, want 0 (should fold to dirty)", got)
  101. }
  102. if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
  103. t.Fatalf("NodeSyncState: %v", err)
  104. } else if !dirty {
  105. t.Fatal("large add must mark the node dirty")
  106. }
  107. }
  108. // TestNodeBulk_SmallAddPushesLive: a small add stays on the live per-client path.
  109. func TestNodeBulk_SmallAddPushesLive(t *testing.T) {
  110. setupBulkDB(t)
  111. nodeID, fake := setupNodeRuntime(t)
  112. ib := nodeInbound(t, nodeID, 30002, nil)
  113. svc := &ClientService{}
  114. inboundSvc := &InboundService{}
  115. const small = 3
  116. add := makeNodeClients(small)
  117. if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil {
  118. t.Fatalf("AddInboundClient: %v", err)
  119. }
  120. if got := fake.addClient.Load(); got != int32(small) {
  121. t.Fatalf("small add streamed %d AddClient RPCs, want %d", got, small)
  122. }
  123. }
  124. func TestNodeUpdateInboundClientNoopSkipsRuntimeAndDirty(t *testing.T) {
  125. setupBulkDB(t)
  126. nodeID, fake := setupNodeRuntime(t)
  127. client := model.Client{
  128. ID: uuid.NewString(),
  129. Email: "noop@x",
  130. SubID: "sub-noop",
  131. Enable: true,
  132. CreatedAt: 111,
  133. UpdatedAt: 222,
  134. }
  135. ib := nodeInbound(t, nodeID, 30020, []model.Client{client})
  136. svc := &ClientService{}
  137. inboundSvc := &InboundService{}
  138. if _, err := svc.UpdateInboundClient(inboundSvc, &model.Inbound{
  139. Id: ib.Id,
  140. Protocol: model.VLESS,
  141. Settings: clientsSettings(t, []model.Client{client}),
  142. }, client.Email); err != nil {
  143. t.Fatalf("UpdateInboundClient: %v", err)
  144. }
  145. if got := fake.updateUser.Load(); got != 0 {
  146. t.Fatalf("no-op update streamed %d UpdateUser RPCs, want 0", got)
  147. }
  148. if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
  149. t.Fatalf("NodeSyncState: %v", err)
  150. } else if dirty {
  151. t.Fatal("no-op update must not mark the node dirty")
  152. }
  153. reloaded, err := inboundSvc.GetInbound(ib.Id)
  154. if err != nil {
  155. t.Fatalf("GetInbound: %v", err)
  156. }
  157. if reloaded.Settings != ib.Settings {
  158. t.Fatal("no-op update rewrote inbound settings")
  159. }
  160. }
  161. func TestNodeUpdateInboundClientLivePushKeepsDirtyBackup(t *testing.T) {
  162. setupBulkDB(t)
  163. nodeID, fake := setupNodeRuntime(t)
  164. client := model.Client{
  165. ID: uuid.NewString(),
  166. Email: "edit@x",
  167. SubID: "sub-edit",
  168. Enable: true,
  169. CreatedAt: 111,
  170. UpdatedAt: 222,
  171. }
  172. ib := nodeInbound(t, nodeID, 30021, []model.Client{client})
  173. edited := client
  174. edited.Comment = "changed"
  175. svc := &ClientService{}
  176. inboundSvc := &InboundService{}
  177. if _, err := svc.UpdateInboundClient(inboundSvc, &model.Inbound{
  178. Id: ib.Id,
  179. Protocol: model.VLESS,
  180. Settings: clientsSettings(t, []model.Client{edited}),
  181. }, client.Email); err != nil {
  182. t.Fatalf("UpdateInboundClient: %v", err)
  183. }
  184. if got := fake.updateUser.Load(); got != 1 {
  185. t.Fatalf("edit streamed %d UpdateUser RPCs, want 1", got)
  186. }
  187. if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
  188. t.Fatalf("NodeSyncState: %v", err)
  189. } else if !dirty {
  190. t.Fatal("successful live update should keep node dirty as reconcile backup")
  191. }
  192. }
  193. // TestNodeBulk_LargeDeleteFoldsToDirty: deleting more than the threshold from an
  194. // online node inbound must fold into a reconcile rather than per-client deletes.
  195. func TestNodeBulk_LargeDeleteFoldsToDirty(t *testing.T) {
  196. setupBulkDB(t)
  197. nodeID, fake := setupNodeRuntime(t)
  198. seed := makeNodeClients(nodeBulkPushThreshold + 10)
  199. nodeInbound(t, nodeID, 30003, seed)
  200. svc := &ClientService{}
  201. inboundSvc := &InboundService{}
  202. emails := make([]string, len(seed))
  203. for i := range seed {
  204. emails[i] = seed[i].Email
  205. }
  206. if _, _, err := svc.BulkDelete(inboundSvc, emails, false); err != nil {
  207. t.Fatalf("BulkDelete: %v", err)
  208. }
  209. if got := fake.deleteUser.Load(); got != 0 {
  210. t.Fatalf("large delete streamed %d DeleteUser RPCs, want 0 (should fold to dirty)", got)
  211. }
  212. if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
  213. t.Fatalf("NodeSyncState: %v", err)
  214. } else if !dirty {
  215. t.Fatal("large delete must mark the node dirty")
  216. }
  217. }